#ifndef ACE_SELECT_REACTOR_T_CPP #define ACE_SELECT_REACTOR_T_CPP #include "ace/Select_Reactor_T.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ #include "ace/ACE.h" #include "ace/Guard_T.h" #include "ace/Log_Category.h" #include "ace/Signal.h" #include "ace/Sig_Handler.h" #include "ace/Thread.h" #include "ace/Timer_Heap.h" #include "ace/OS_NS_errno.h" #include "ace/OS_NS_sys_select.h" #include "ace/OS_NS_sys_stat.h" // For timer_queue_ #include "ace/Recursive_Thread_Mutex.h" #if !defined (__ACE_INLINE__) #include "ace/Select_Reactor_T.inl" #endif /* __ACE_INLINE__ */ ACE_BEGIN_VERSIONED_NAMESPACE_DECL ACE_ALLOC_HOOK_DEFINE_Tc(ACE_Select_Reactor_T) template int ACE_Select_Reactor_T::any_ready (ACE_Select_Reactor_Handle_Set &wait_set) { ACE_TRACE ("ACE_Select_Reactor_T::any_ready"); if (this->mask_signals_) { #if !defined (ACE_WIN32) // Make this call signal safe. ACE_Sig_Guard sb; #endif /* ACE_WIN32 */ return this->any_ready_i (wait_set); } return this->any_ready_i (wait_set); } template int ACE_Select_Reactor_T::any_ready_i (ACE_Select_Reactor_Handle_Set &wait_set) { ACE_TRACE ("ACE_Select_Reactor_T::any_ready_i"); int const number_ready = this->ready_set_.rd_mask_.num_set () + this->ready_set_.wr_mask_.num_set () + this->ready_set_.ex_mask_.num_set (); // number_ready > 0 meaning there are handles in the ready_set // &wait_set != &(this->ready_set_) means that we need to copy // the handles from the ready_set to the wait set because the // wait_set_ doesn't contain all the handles in the ready_set_ if (number_ready > 0 && &wait_set != &(this->ready_set_)) { wait_set.rd_mask_ = this->ready_set_.rd_mask_; wait_set.wr_mask_ = this->ready_set_.wr_mask_; wait_set.ex_mask_ = this->ready_set_.ex_mask_; this->ready_set_.rd_mask_.reset (); this->ready_set_.wr_mask_.reset (); this->ready_set_.ex_mask_.reset (); } return number_ready; } template int ACE_Select_Reactor_T::handler_i (int signum, ACE_Event_Handler **eh) { ACE_TRACE ("ACE_Select_Reactor_T::handler_i"); ACE_Event_Handler *handler = this->signal_handler_->handler (signum); if (handler == 0) return -1; else if (eh != 0) *eh = handler; return 0; } template bool ACE_Select_Reactor_T::initialized (void) { ACE_TRACE ("ACE_Select_Reactor_T::initialized"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, false)); return this->initialized_; } template int ACE_Select_Reactor_T::owner (ACE_thread_t tid, ACE_thread_t *o_id) { ACE_TRACE ("ACE_Select_Reactor_T::owner"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); if (o_id) { *o_id = this->owner_; } this->owner_ = tid; return 0; } template int ACE_Select_Reactor_T::owner (ACE_thread_t *t_id) { ACE_TRACE ("ACE_Select_Reactor_T::owner"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); *t_id = this->owner_; return 0; } template bool ACE_Select_Reactor_T::restart (void) { ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, false)); return this->restart_; } template bool ACE_Select_Reactor_T::restart (bool r) { ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, false)); bool const current_value = this->restart_; this->restart_ = r; return current_value; } template void ACE_Select_Reactor_T::requeue_position (int rp) { ACE_TRACE ("ACE_Select_Reactor_T::requeue_position"); ACE_MT (ACE_GUARD (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_)); #if defined (ACE_WIN32) ACE_UNUSED_ARG (rp); // Must always requeue ourselves "next" on Win32. this->requeue_position_ = 0; #else this->requeue_position_ = rp; #endif /* ACE_WIN32 */ } template int ACE_Select_Reactor_T::requeue_position (void) { ACE_TRACE ("ACE_Select_Reactor_T::requeue_position"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->requeue_position_; } template void ACE_Select_Reactor_T::max_notify_iterations (int iterations) { ACE_TRACE ("ACE_Select_Reactor_T::max_notify_iterations"); ACE_MT (ACE_GUARD (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_)); this->notify_handler_->max_notify_iterations (iterations); } template int ACE_Select_Reactor_T::max_notify_iterations (void) { ACE_TRACE ("ACE_Select_Reactor_T::max_notify_iterations"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->notify_handler_->max_notify_iterations (); } // Enqueue ourselves into the list of waiting threads. template void ACE_Select_Reactor_T::renew (void) { ACE_TRACE ("ACE_Select_Reactor_T::renew"); #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) if (!this->supress_notify_renew ()) this->token_.renew (this->requeue_position_); #endif /* defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) */ } template int ACE_Select_Reactor_T::notify (ACE_Event_Handler *eh, ACE_Reactor_Mask mask, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Select_Reactor_T::notify"); // Pass over both the Event_Handler *and* the mask to allow the // caller to dictate which Event_Handler method the receiver // invokes. Note that this call can timeout. ssize_t n = -1; if (this->notify_handler_) { n = this->notify_handler_->notify (eh, mask, timeout); } return n == -1 ? -1 : 0; } template int ACE_Select_Reactor_T::resume_handler (ACE_HANDLE handle) { ACE_TRACE ("ACE_Select_Reactor_T::resume_handler"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->resume_i (handle); } template int ACE_Select_Reactor_T::suspend_handler (ACE_HANDLE handle) { ACE_TRACE ("ACE_Select_Reactor_T::suspend_handler"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->suspend_i (handle); } template int ACE_Select_Reactor_T::suspend_handlers (void) { ACE_TRACE ("ACE_Select_Reactor_T::suspend_handlers"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); ACE_Event_Handler *eh = 0; for (ACE_Select_Reactor_Handler_Repository_Iterator iter (&this->handler_rep_); iter.next (eh) != 0; iter.advance ()) { this->suspend_i (eh->get_handle ()); } return 0; } template int ACE_Select_Reactor_T::resume_handlers (void) { ACE_TRACE ("ACE_Select_Reactor_T::resume_handlers"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); ACE_Event_Handler *eh = 0; for (ACE_Select_Reactor_Handler_Repository_Iterator iter (&this->handler_rep_); iter.next (eh) != 0; iter.advance ()) { this->resume_i (eh->get_handle ()); } return 0; } template int ACE_Select_Reactor_T::register_handler (ACE_Event_Handler *handler, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_T::register_handler"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->register_handler_i (handler->get_handle (), handler, mask); } template int ACE_Select_Reactor_T::register_handler (ACE_HANDLE handle, ACE_Event_Handler *handler, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_T::register_handler"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->register_handler_i (handle, handler, mask); } template int ACE_Select_Reactor_T::register_handler (const ACE_Handle_Set &handles, ACE_Event_Handler *handler, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_T::register_handler"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->register_handler_i (handles, handler, mask); } template ACE_Event_Handler * ACE_Select_Reactor_T::find_handler (ACE_HANDLE handle) { ACE_TRACE ("ACE_Select_Reactor_T::find_handler"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, 0)); return this->find_handler_i (handle); } template int ACE_Select_Reactor_T::handler (ACE_HANDLE handle, ACE_Reactor_Mask mask, ACE_Event_Handler **handler) { ACE_TRACE ("ACE_Select_Reactor_T::handler"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->handler_i (handle, mask, handler); } template int ACE_Select_Reactor_T::remove_handler (const ACE_Handle_Set &handles, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_T::remove_handler"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->remove_handler_i (handles, mask); } template int ACE_Select_Reactor_T::remove_handler (ACE_Event_Handler *handler, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_T::remove_handler"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->remove_handler_i (handler->get_handle (), mask); } template int ACE_Select_Reactor_T::remove_handler (ACE_HANDLE handle, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_T::remove_handler"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->remove_handler_i (handle, mask); } // Performs operations on the "ready" bits. template int ACE_Select_Reactor_T::ready_ops (ACE_HANDLE handle, ACE_Reactor_Mask mask, int ops) { ACE_TRACE ("ACE_Select_Reactor_T::ready_ops"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); return this->bit_ops (handle, mask, this->ready_set_, ops); } template int ACE_Select_Reactor_T::open (size_t size, bool restart, ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, int disable_notify_pipe, ACE_Reactor_Notify *notify) { ACE_TRACE ("ACE_Select_Reactor_T::open"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); // Can't initialize ourselves more than once. if (this->initialized_) return -1; this->owner_ = ACE_Thread::self (); this->restart_ = restart; this->signal_handler_ = sh; this->timer_queue_ = tq; this->notify_handler_ = notify; int result = 0; // Allows the signal handler to be overridden. if (this->signal_handler_ == 0) { ACE_NEW_RETURN (this->signal_handler_, ACE_Sig_Handler, -1); this->delete_signal_handler_ = true; } // Allows the timer queue to be overridden. if (result != -1 && this->timer_queue_ == 0) { ACE_NEW_RETURN (this->timer_queue_, ACE_Timer_Heap, -1); this->delete_timer_queue_ = true; } // Allows the Notify_Handler to be overridden. if (result != -1 && this->notify_handler_ == 0) { ACE_NEW_RETURN (this->notify_handler_, ACE_Select_Reactor_Notify, -1); this->delete_notify_handler_ = true; } if (result != -1 && this->handler_rep_.open (size) == -1) result = -1; else if (this->notify_handler_->open (this, 0, disable_notify_pipe) == -1) { ACELIB_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("notification pipe open failed"))); result = -1; } if (result != -1) // We're all set to go. this->initialized_ = true; else // This will close down all the allocated resources properly. this->close (); return result; } template int ACE_Select_Reactor_T::set_sig_handler (ACE_Sig_Handler *signal_handler) { delete this->signal_handler_; this->signal_handler_ = signal_handler; this->delete_signal_handler_ = false; return 0; } template ACE_Timer_Queue * ACE_Select_Reactor_T::timer_queue () const { return this->timer_queue_; } template int ACE_Select_Reactor_T::timer_queue (ACE_Timer_Queue *tq) { if (this->delete_timer_queue_) { delete this->timer_queue_; } else if (this->timer_queue_) { this->timer_queue_->close (); } this->timer_queue_ = tq; this->delete_timer_queue_ = false; return 0; } template ACE_Select_Reactor_T::ACE_Select_Reactor_T (ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, int disable_notify_pipe, ACE_Reactor_Notify *notify, bool mask_signals, int s_queue) : ACE_Select_Reactor_Impl (mask_signals) , token_ (s_queue) , lock_adapter_ (token_) , deactivated_ (0) { ACE_TRACE ("ACE_Select_Reactor_T::ACE_Select_Reactor_T"); this->token_.reactor (*this); // First try to open the Reactor with the hard-coded default. if (this->open (ACE_Select_Reactor_T::DEFAULT_SIZE, 0, sh, tq, disable_notify_pipe, notify) == -1) { // The hard-coded default Reactor size failed, so attempt to // determine the size at run-time by checking the process file // descriptor limit on platforms that support this feature. // reset the errno so that subsequent checks are valid errno = 0; // There is no need to deallocate resources from previous open() // call since the open() method deallocates any resources prior // to exiting if an error was encountered. // Set the default reactor size to be the current limit on the // number of file descriptors available to the process. This // size is not necessarily the maximum limit. if (this->open (ACE::max_handles (), 0, sh, tq, disable_notify_pipe, notify) == -1) ACELIB_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("ACE_Select_Reactor_T::open ") ACE_TEXT ("failed inside ") ACE_TEXT ("ACE_Select_Reactor_T::CTOR"))); } } // Initialize ACE_Select_Reactor_T. template ACE_Select_Reactor_T::ACE_Select_Reactor_T (size_t size, bool restart, ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, int disable_notify_pipe, ACE_Reactor_Notify *notify, bool mask_signals, int s_queue) : ACE_Select_Reactor_Impl (mask_signals) , token_ (s_queue) , lock_adapter_ (token_) , deactivated_ (0) { ACE_TRACE ("ACE_Select_Reactor_T::ACE_Select_Reactor_T"); this->token_.reactor (*this); if (this->open (size, restart, sh, tq, disable_notify_pipe, notify) == -1) ACELIB_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("ACE_Select_Reactor_T::open ") ACE_TEXT ("failed inside ACE_Select_Reactor_T::CTOR"))); } // Close down the ACE_Select_Reactor_T instance, detaching any // remaining Event_Handers. This had better be called from the main // event loop thread... template int ACE_Select_Reactor_T::close (void) { ACE_TRACE ("ACE_Select_Reactor_T::close"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); if (this->delete_signal_handler_) { delete this->signal_handler_; this->signal_handler_ = 0; this->delete_signal_handler_ = false; } this->handler_rep_.close (); if (this->delete_timer_queue_) { delete this->timer_queue_; this->timer_queue_ = 0; this->delete_timer_queue_ = false; } else if (this->timer_queue_) { this->timer_queue_->close (); this->timer_queue_ = 0; } if (this->notify_handler_ != 0) this->notify_handler_->close (); if (this->delete_notify_handler_) { delete this->notify_handler_; this->notify_handler_ = 0; this->delete_notify_handler_ = false; } this->initialized_ = false; return 0; } template int ACE_Select_Reactor_T::current_info (ACE_HANDLE, size_t &) { return -1; } template ACE_Select_Reactor_T::~ACE_Select_Reactor_T (void) { ACE_TRACE ("ACE_Select_Reactor_T::~ACE_Select_Reactor_T"); this->close (); } template int ACE_Select_Reactor_T::remove_handler_i (const ACE_Handle_Set &handles, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_T::remove_handler_i"); ACE_HANDLE h; ACE_Handle_Set_Iterator handle_iter (handles); while ((h = handle_iter ()) != ACE_INVALID_HANDLE) if (this->remove_handler_i (h, mask) == -1) return -1; return 0; } template int ACE_Select_Reactor_T::register_handler_i (const ACE_Handle_Set &handles, ACE_Event_Handler *handler, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_T::register_handler_i"); ACE_HANDLE h; ACE_Handle_Set_Iterator handle_iter (handles); while ((h = handle_iter ()) != ACE_INVALID_HANDLE) if (this->register_handler_i (h, handler, mask) == -1) return -1; return 0; } template int ACE_Select_Reactor_T::register_handler (const ACE_Sig_Set &sigset, ACE_Event_Handler *new_sh, ACE_Sig_Action *new_disp) { ACE_TRACE ("ACE_Select_Reactor_T::register_handler"); int result = 0; #if (ACE_NSIG > 0) for (int s = 1; s < ACE_NSIG; ++s) if ((sigset.is_member (s) == 1) && this->signal_handler_->register_handler (s, new_sh, new_disp) == -1) result = -1; #else /* ACE_NSIG <= 0 */ ACE_UNUSED_ARG (sigset); ACE_UNUSED_ARG (new_sh); ACE_UNUSED_ARG (new_disp); #endif /* ACE_NSIG <= 0 */ return result; } template int ACE_Select_Reactor_T::remove_handler (const ACE_Sig_Set &sigset) { ACE_TRACE ("ACE_Select_Reactor_T::remove_handler"); int result = 0; #if (ACE_NSIG > 0) for (int s = 1; s < ACE_NSIG; ++s) if ((sigset.is_member (s) == 1) && this->signal_handler_->remove_handler (s) == -1) result = -1; #else /* ACE_NSIG <= 0 */ ACE_UNUSED_ARG (sigset); #endif /* ACE_NSIG <= 0 */ return result; } template int ACE_Select_Reactor_T::cancel_timer (ACE_Event_Handler *handler, int dont_call_handle_close) { ACE_TRACE ("ACE_Select_Reactor_T::cancel_timer"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); if ((this->timer_queue_ != 0) && (handler != 0)) return this->timer_queue_->cancel (handler, dont_call_handle_close); else return 0; } template int ACE_Select_Reactor_T::cancel_timer (long timer_id, const void **arg, int dont_call_handle_close) { ACE_TRACE ("ACE_Select_Reactor_T::cancel_timer"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); if (this->timer_queue_ != 0) return this->timer_queue_->cancel (timer_id, arg, dont_call_handle_close); else return 0; } template long ACE_Select_Reactor_T::schedule_timer (ACE_Event_Handler *handler, const void *arg, const ACE_Time_Value &delay_time, const ACE_Time_Value &interval) { ACE_TRACE ("ACE_Select_Reactor_T::schedule_timer"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); if (0 != this->timer_queue_) return this->timer_queue_->schedule (handler, arg, timer_queue_->gettimeofday () + delay_time, interval); errno = ESHUTDOWN; return -1; } template int ACE_Select_Reactor_T::reset_timer_interval (long timer_id, const ACE_Time_Value &interval) { ACE_TRACE ("ACE_Select_Reactor_T::reset_timer_interval"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); if (0 != this->timer_queue_) { return this->timer_queue_->reset_interval (timer_id, interval); } errno = ESHUTDOWN; return -1; } // Main event loop driver that blocks for before // returning (will return earlier if I/O or signal events occur). template int ACE_Select_Reactor_T::handle_events (ACE_Time_Value &max_wait_time) { ACE_TRACE ("ACE_Select_Reactor_T::handle_events"); return this->handle_events (&max_wait_time); } template int ACE_Select_Reactor_T::handle_error (void) { ACE_TRACE ("ACE_Select_Reactor_T::handle_error"); #if defined (ACE_LINUX) && defined (ERESTARTNOHAND) int const error = errno; // Avoid multiple TSS accesses. if (error == EINTR || error == ERESTARTNOHAND) return this->restart_; #else if (errno == EINTR) return this->restart_; #endif /* ACE_LINUX && ERESTARTNOHAND */ #if defined (__MVS__) || defined (ACE_WIN32) || defined (ACE_VXWORKS) // On MVS Open Edition and Win32, there can be a number of failure // codes on a bad socket, so check_handles on anything other than // EINTR. VxWorks doesn't even bother to always set errno on error // in select (specifically, it doesn't return EBADF for bad FDs). else return this->check_handles (); #else else if (errno == EBADF) return this->check_handles (); else return -1; #endif /* __MVS__ || ACE_WIN32 */ } template void ACE_Select_Reactor_T::notify_handle (ACE_HANDLE handle, ACE_Reactor_Mask mask, ACE_Handle_Set &ready_mask, ACE_Event_Handler *event_handler, ACE_EH_PTMF ptmf) { ACE_TRACE ("ACE_Select_Reactor_T::notify_handle"); // Check for removed handlers. if (event_handler == 0) return; bool const reference_counting_required = event_handler->reference_counting_policy ().value () == ACE_Event_Handler::Reference_Counting_Policy::ENABLED; // Call add_reference() if needed. if (reference_counting_required) { event_handler->add_reference (); } int const status = (event_handler->*ptmf) (handle); if (status < 0) this->remove_handler_i (handle, mask); else if (status > 0) ready_mask.set_bit (handle); // Call remove_reference() if needed. if (reference_counting_required) event_handler->remove_reference (); } // Perform GET, CLR, SET, and ADD operations on the select() // Handle_Sets. // // GET = 1, Retrieve current value // SET = 2, Set value of bits to new mask (changes the entire mask) // ADD = 3, Bitwise "or" the value into the mask (only changes // enabled bits) // CLR = 4 Bitwise "and" the negation of the value out of the mask // (only changes enabled bits) // // Returns the original mask. template int ACE_Select_Reactor_T::mask_ops (ACE_HANDLE handle, ACE_Reactor_Mask mask, int ops) { ACE_TRACE ("ACE_Select_Reactor_T::mask_ops"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); // If the handle is not suspended, then set the ops on the // , otherwise set the . if (this->is_suspended_i (handle)) return this->bit_ops (handle, mask, this->suspend_set_, ops); else return this->bit_ops (handle, mask, this->wait_set_, ops); } template ACE_Event_Handler * ACE_Select_Reactor_T::find_handler_i (ACE_HANDLE handle) { ACE_TRACE ("ACE_Select_Reactor_T::find_handler_i"); ACE_Event_Handler *event_handler = this->handler_rep_.find (handle); if (event_handler) { event_handler->add_reference (); } return event_handler; } // Must be called with locks held. template int ACE_Select_Reactor_T::handler_i (ACE_HANDLE handle, ACE_Reactor_Mask mask, ACE_Event_Handler **eh) { ACE_TRACE ("ACE_Select_Reactor_T::handler_i"); ACE_Event_Handler *event_handler = this->handler_rep_.find (handle); if (event_handler == 0) return -1; else { if ((ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)) && this->wait_set_.rd_mask_.is_set (handle) == 0) return -1; if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK) && this->wait_set_.wr_mask_.is_set (handle) == 0) return -1; if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK) && this->wait_set_.ex_mask_.is_set (handle) == 0) return -1; } if (eh != 0) { *eh = event_handler; event_handler->add_reference (); } return 0; } // Must be called with locks held template int ACE_Select_Reactor_T::resume_i (ACE_HANDLE handle) { ACE_TRACE ("ACE_Select_Reactor_T::resume_i"); if (this->handler_rep_.find (handle) == 0) return -1; if (this->suspend_set_.rd_mask_.is_set (handle)) { this->wait_set_.rd_mask_.set_bit (handle); this->suspend_set_.rd_mask_.clr_bit (handle); } if (this->suspend_set_.wr_mask_.is_set (handle)) { this->wait_set_.wr_mask_.set_bit (handle); this->suspend_set_.wr_mask_.clr_bit (handle); } if (this->suspend_set_.ex_mask_.is_set (handle)) { this->wait_set_.ex_mask_.set_bit (handle); this->suspend_set_.ex_mask_.clr_bit (handle); } return 0; } // Must be called with locks held template int ACE_Select_Reactor_T::suspend_i (ACE_HANDLE handle) { ACE_TRACE ("ACE_Select_Reactor_T::suspend_i"); if (this->handler_rep_.find (handle) == 0) return -1; if (this->wait_set_.rd_mask_.is_set (handle)) { this->suspend_set_.rd_mask_.set_bit (handle); this->wait_set_.rd_mask_.clr_bit (handle); } if (this->wait_set_.wr_mask_.is_set (handle)) { this->suspend_set_.wr_mask_.set_bit (handle); this->wait_set_.wr_mask_.clr_bit (handle); } if (this->wait_set_.ex_mask_.is_set (handle)) { this->suspend_set_.ex_mask_.set_bit (handle); this->wait_set_.ex_mask_.clr_bit (handle); } // Kobi: we need to remove that handle from the // dispatch set as well. We use that function with all the relevant // masks - rd/wr/ex - all the mask. it is completely suspended this->clear_dispatch_mask (handle, ACE_Event_Handler::RWE_MASK); return 0; } // Must be called with locks held template int ACE_Select_Reactor_T::is_suspended_i (ACE_HANDLE handle) { ACE_TRACE ("ACE_Select_Reactor_T::is_suspended_i"); if (this->handler_rep_.find (handle) == 0) return 0; return this->suspend_set_.rd_mask_.is_set (handle) || this->suspend_set_.wr_mask_.is_set (handle) || this->suspend_set_.ex_mask_.is_set (handle); } // Must be called with locks held template int ACE_Select_Reactor_T::register_handler_i (ACE_HANDLE handle, ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_T::register_handler_i"); // Insert the tuple into the Handler // Repository. return this->handler_rep_.bind (handle, event_handler, mask); } template int ACE_Select_Reactor_T::remove_handler_i (ACE_HANDLE handle, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_T::remove_handler_i"); // Unbind this handle. return this->handler_rep_.unbind (handle, mask); } template int ACE_Select_Reactor_T::work_pending (const ACE_Time_Value &max_wait_time) { ACE_TRACE ("ACE_Select_Reactor_T::work_pending"); ACE_Time_Value mwt (max_wait_time); ACE_MT (ACE_Countdown_Time countdown (&mwt)); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); if (this->deactivated_) return 0; // Update the countdown to reflect time waiting for the mutex. ACE_MT (countdown.update ()); ACE_Time_Value timer_buf (0); ACE_Time_Value *this_timeout = this->timer_queue_->calculate_timeout (&mwt, &timer_buf); // Check if we have timers to fire. bool const timers_pending = (this_timeout != 0 && *this_timeout != mwt ? true : false); #ifdef ACE_WIN32 // This arg is ignored on Windows and causes pointer truncation // warnings on 64-bit compiles. int const width = 0; #else int const width = this->handler_rep_.max_handlep1 (); #endif /* ACE_WIN32 */ ACE_Select_Reactor_Handle_Set fd_set; fd_set.rd_mask_ = this->wait_set_.rd_mask_; fd_set.wr_mask_ = this->wait_set_.wr_mask_; fd_set.ex_mask_ = this->wait_set_.ex_mask_; int const nfds = ACE_OS::select (width, fd_set.rd_mask_, fd_set.wr_mask_, fd_set.ex_mask_, this_timeout); // If timers are pending, override any timeout from the select() // call. return (nfds == 0 && timers_pending ? 1 : nfds); } // Must be called with lock held. template int ACE_Select_Reactor_T::wait_for_multiple_events (ACE_Select_Reactor_Handle_Set &dispatch_set, ACE_Time_Value *max_wait_time) { ACE_TRACE ("ACE_Select_Reactor_T::wait_for_multiple_events"); ACE_Time_Value timer_buf (0); ACE_Time_Value *this_timeout = 0; int number_of_active_handles = this->any_ready (dispatch_set); // If there are any bits enabled in the then we'll // handle those first, otherwise we'll block in was interrupted. if (ACE_Sig_Handler::sig_pending () != 0) { ACE_Sig_Handler::sig_pending (0); // If any HANDLES in the are activated as a // result of signals they should be dispatched since // they may be time critical... active_handle_count = this->any_ready (dispatch_set); // Record the fact that the Reactor has dispatched a // handle_signal() method. We need this to return the // appropriate count below. signal_occurred = 1; } else return -1; } // Handle timers early since they may have higher latency // constraints than I/O handlers. Ideally, the order of // dispatching should be a strategy... else if (this->dispatch_timer_handlers (other_handlers_dispatched) == -1) // State has changed or timer queue has failed, exit loop. break; // Check to see if there are no more I/O handles left to // dispatch AFTER we've handled the timers... else if (active_handle_count == 0) return io_handlers_dispatched + other_handlers_dispatched + signal_occurred; // Next dispatch the notification handlers (if there are any to // dispatch). These are required to handle multi-threads that // are trying to update the . else if (this->dispatch_notification_handlers (dispatch_set, active_handle_count, other_handlers_dispatched) == -1) // State has changed or a serious failure has occurred, so exit // loop. break; // Finally, dispatch the I/O handlers. else if (this->dispatch_io_handlers (dispatch_set, active_handle_count, io_handlers_dispatched) == -1) // State has changed, so exit loop. break; // if state changed, we need to re-eval active_handle_count, // so we will not end with an endless loop if (initial_handle_count == active_handle_count || this->state_changed_) { active_handle_count = this->any_ready (dispatch_set); } } while (active_handle_count > 0); return io_handlers_dispatched + other_handlers_dispatched + signal_occurred; } template int ACE_Select_Reactor_T::release_token (void) { #if defined (ACE_WIN32) this->token_.release (); return (int) EXCEPTION_CONTINUE_SEARCH; #else return 0; #endif /* ACE_WIN32 */ } template int ACE_Select_Reactor_T::handle_events (ACE_Time_Value *max_wait_time) { ACE_TRACE ("ACE_Select_Reactor_T::handle_events"); // Stash the current time -- the destructor of this object will // automatically compute how much time elapsed since this method was // called. ACE_Countdown_Time countdown (max_wait_time); #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1); if (ACE_OS::thr_equal (ACE_Thread::self (), this->owner_) == 0) { errno = EACCES; return -1; } if (this->deactivated_) { errno = ESHUTDOWN; return -1; } // Update the countdown to reflect time waiting for the mutex. countdown.update (); #else if (this->deactivated_) { errno = ESHUTDOWN; return -1; } #endif /* ACE_MT_SAFE */ return this->handle_events_i (max_wait_time); } template int ACE_Select_Reactor_T::handle_events_i (ACE_Time_Value *max_wait_time) { int result = -1; ACE_SEH_TRY { // We use the data member dispatch_set_ as the current dispatch // set. // We need to start from a clean dispatch_set this->dispatch_set_.rd_mask_.reset (); this->dispatch_set_.wr_mask_.reset (); this->dispatch_set_.ex_mask_.reset (); int number_of_active_handles = this->wait_for_multiple_events (this->dispatch_set_, max_wait_time); result = this->dispatch (number_of_active_handles, this->dispatch_set_); } ACE_SEH_EXCEPT (this->release_token ()) { // As it stands now, we catch and then rethrow all Win32 // structured exceptions so that we can make sure to release the // lock correctly. } return result; } template int ACE_Select_Reactor_T::check_handles (void) { ACE_TRACE ("ACE_Select_Reactor_T::check_handles"); #if defined (ACE_WIN32) || defined (__MVS__) || defined (ACE_VXWORKS) ACE_Time_Value time_poll = ACE_Time_Value::zero; ACE_Handle_Set rd_mask; #endif /* ACE_WIN32 || MVS || ACE_VXWORKS */ int result = 0; /* * It's easier to run through the handler repository iterator, but that * misses handles that are registered on a handler that doesn't implement * get_handle(). So, build a handle set that's the union of the three * wait_sets (rd, wrt, ex) and run through that. Bad handles get cleared * out of all sets. */ ACE_HANDLE h; ACE_Handle_Set check_set (this->wait_set_.rd_mask_); ACE_Handle_Set_Iterator wr_iter (this->wait_set_.wr_mask_); while ((h = wr_iter ()) != ACE_INVALID_HANDLE) check_set.set_bit (h); ACE_Handle_Set_Iterator ex_iter (this->wait_set_.ex_mask_); while ((h = ex_iter ()) != ACE_INVALID_HANDLE) check_set.set_bit (h); ACE_Handle_Set_Iterator check_iter (check_set); while ((h = check_iter ()) != ACE_INVALID_HANDLE) { #if defined (ACE_WIN32) || defined (__MVS__) || defined (ACE_VXWORKS) // Win32 needs to do the check this way because fstat won't work on // a socket handle. MVS Open Edition needs to do it this way because, // even though the docs say to check a handle with either select or // fstat, the fstat method always says the handle is ok. // pSOS needs to do it this way because file handles and socket handles // are maintained by separate pieces of the system. VxWorks needs the select // variant since fstat always returns an error on socket FDs. rd_mask.set_bit (h); # if defined (ACE_WIN32) // This arg is ignored on Windows and causes pointer truncation // warnings on 64-bit compiles. int select_width = 0; # else int select_width = int (h) + 1; # endif /* ACE_WIN32 */ if (ACE_OS::select (select_width, rd_mask, 0, 0, &time_poll) < 0) { result = 1; this->remove_handler_i (h, ACE_Event_Handler::ALL_EVENTS_MASK); this->wait_set_.rd_mask_.clr_bit (h); this->wait_set_.wr_mask_.clr_bit (h); this->wait_set_.ex_mask_.clr_bit (h); } rd_mask.clr_bit (h); #else /* !ACE_WIN32 && !MVS && !VXWORKS */ struct stat temp; if (ACE_OS::fstat (h, &temp) == -1) { result = 1; this->remove_handler_i (h, ACE_Event_Handler::ALL_EVENTS_MASK); } #endif /* ACE_WIN32 || MVS */ } return result; } template void ACE_Select_Reactor_T::dump () const { #if defined (ACE_HAS_DUMP) ACE_TRACE ("ACE_Select_Reactor_T::dump"); ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); this->timer_queue_->dump (); this->handler_rep_.dump (); this->signal_handler_->dump (); ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("delete_signal_handler_ = %d\n"), this->delete_signal_handler_)); ACE_HANDLE h; for (ACE_Handle_Set_Iterator handle_iter_wr (this->wait_set_.wr_mask_); (h = handle_iter_wr ()) != ACE_INVALID_HANDLE;) ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("write_handle = %d\n"), h)); for (ACE_Handle_Set_Iterator handle_iter_rd (this->wait_set_.rd_mask_); (h = handle_iter_rd ()) != ACE_INVALID_HANDLE;) ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("read_handle = %d\n"), h)); for (ACE_Handle_Set_Iterator handle_iter_ex (this->wait_set_.ex_mask_); (h = handle_iter_ex ()) != ACE_INVALID_HANDLE;) ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("except_handle = %d\n"), h)); for (ACE_Handle_Set_Iterator handle_iter_wr_ready (this->ready_set_.wr_mask_); (h = handle_iter_wr_ready ()) != ACE_INVALID_HANDLE;) ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("write_handle_ready = %d\n"), h)); for (ACE_Handle_Set_Iterator handle_iter_rd_ready (this->ready_set_.rd_mask_); (h = handle_iter_rd_ready ()) != ACE_INVALID_HANDLE;) ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("read_handle_ready = %d\n"), h)); for (ACE_Handle_Set_Iterator handle_iter_ex_ready (this->ready_set_.ex_mask_); (h = handle_iter_ex_ready ()) != ACE_INVALID_HANDLE;) ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("except_handle_ready = %d\n"), h)); for (ACE_Handle_Set_Iterator handle_iter_su_ready (this->suspend_set_.wr_mask_); (h = handle_iter_su_ready ()) != ACE_INVALID_HANDLE;) ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("write_handle_suspend = %d\n"), h)); for (ACE_Handle_Set_Iterator handle_iter_su_ready (this->suspend_set_.rd_mask_); (h = handle_iter_su_ready ()) != ACE_INVALID_HANDLE;) ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("read_handle_suspend = %d\n"), h)); for (ACE_Handle_Set_Iterator handle_iter_su_ready (this->suspend_set_.ex_mask_); (h = handle_iter_su_ready ()) != ACE_INVALID_HANDLE;) ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("except_handle_suspend = %d\n"), h)); ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("restart_ = %d\n"), this->restart_)); ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("requeue_position_ = %d\n"), this->requeue_position_)); ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("initialized_ = %d\n"), this->initialized_)); ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("owner_ = %d\n"), this->owner_)); #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) this->notify_handler_->dump (); this->token_.dump (); #endif /* ACE_MT_SAFE */ ACELIB_DEBUG ((LM_DEBUG, ACE_END_DUMP)); #endif /* ACE_HAS_DUMP */ } ACE_END_VERSIONED_NAMESPACE_DECL #endif /* ACE_SELECT_REACTOR_T_CPP */