// $Id$ #define ACE_BUILD_DLL #include "ace/Select_Reactor.h" #include "ace/Reactor.h" #include "ace/Thread.h" #include "ace/Synch_T.h" #include "ace/SOCK_Acceptor.h" #include "ace/SOCK_Connector.h" #include "ace/Timer_Heap.h" #if !defined (__ACE_INLINE__) #include "ace/Select_Reactor.i" #endif /* __ACE_INLINE__ */ ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor) #if defined (ACE_WIN32) #define ACE_SELECT_REACTOR_HANDLE(H) (this->event_handlers_[(H)].handle_) #define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)].event_handler_) #else #define ACE_SELECT_REACTOR_HANDLE(H) (H) #define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)]) #endif /* ACE_WIN32 */ // Performs sanity checking on the ACE_HANDLE. int ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle) { ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle"); #if defined (ACE_WIN32) // It's too expensive to perform more exhaustive validity checks on // Win32 due to the way that they implement SOCKET HANDLEs. if (handle == ACE_INVALID_HANDLE) #else /* !ACE_WIN32 */ if (handle < 0 || handle >= this->max_size_) #endif /* ACE_WIN32 */ { errno = EINVAL; return 1; } else return 0; } // Performs sanity checking on the ACE_HANDLE. int ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle) { ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range"); #if defined (ACE_WIN32) // It's too expensive to perform more exhaustive validity checks on // Win32 due to the way that they implement SOCKET HANDLEs. if (handle != ACE_INVALID_HANDLE) #else /* !ACE_WIN32 */ if (handle >= 0 && handle < this->max_handlep1_) #endif /* ACE_WIN32 */ return 1; else { errno = EINVAL; return 0; } } size_t ACE_Select_Reactor_Handler_Repository::max_handlep1 (void) { ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::max_handlep1"); return this->max_handlep1_; } int ACE_Select_Reactor_Handler_Repository::open (size_t size) { ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open"); this->max_size_ = size; this->max_handlep1_ = 0; #if defined (ACE_WIN32) // Try to allocate the memory. ACE_NEW_RETURN (this->event_handlers_, ACE_Event_Tuple[size], -1); // Initialize the ACE_Event_Handler * to { ACE_INVALID_HANDLE, 0 }. for (size_t h = 0; h < size; h++) { ACE_SELECT_REACTOR_HANDLE (h) = ACE_INVALID_HANDLE; ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0; } #else // Try to allocate the memory. ACE_NEW_RETURN (this->event_handlers_, ACE_Event_Handler *[size], -1); // Initialize the ACE_Event_Handler * to NULL. for (size_t h = 0; h < size; h++) ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0; #endif /* ACE_WIN32 */ // Check to see if the user is asking for too much and fail in this // case. if (size > FD_SETSIZE) { errno = ERANGE; return -1; } // Increase the number of handles if is greater than the // current limit. if (size < (size_t) ACE::max_handles ()) return ACE::set_handle_limit (size); else return 0; } // Initialize a repository of the appropriate . ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor &select_reactor) : select_reactor_ (select_reactor), max_size_ (0), max_handlep1_ (0), event_handlers_ (0) { ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository"); } int ACE_Select_Reactor_Handler_Repository::unbind_all (void) { // Unbind all of the s. for (int handle = 0; handle < this->max_handlep1_; handle++) this->unbind (ACE_SELECT_REACTOR_HANDLE (handle), ACE_Event_Handler::ALL_EVENTS_MASK); return 0; } int ACE_Select_Reactor_Handler_Repository::close (void) { ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close"); if (this->event_handlers_ != 0) { this->unbind_all (); delete [] this->event_handlers_; this->event_handlers_ = 0; } return 0; } // Return the associated with the . ACE_Event_Handler * ACE_Select_Reactor_Handler_Repository::find (ACE_HANDLE handle, size_t *index_p) { ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find"); ACE_Event_Handler *eh = 0; ssize_t i; // Only bother to search for the if it's in range. if (this->handle_in_range (handle)) { #if defined (ACE_WIN32) i = 0; for (; i < this->max_handlep1_; i++) if (ACE_SELECT_REACTOR_HANDLE (i) == handle) { eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, i); break; } #else i = handle; eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle); #endif /* ACE_WIN32 */ } else // g++ can't figure out that i won't be used below if the handle // is out of range, so keep it happy by defining i here . . . i = 0; if (eh != 0 && index_p != 0) *index_p = i; else errno = ENOENT; return eh; } // Bind the to the . int ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle, ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind"); if (handle == ACE_INVALID_HANDLE) handle = event_handler->get_handle (); if (this->invalid_handle (handle)) return -1; #if defined (ACE_WIN32) int assigned_slot = -1; for (ssize_t i = 0; i < this->max_handlep1_; i++) { // Found it, so let's just reuse this location. if (ACE_SELECT_REACTOR_HANDLE (i) == handle) { assigned_slot = i; break; } // Here's the first free slot, so let's take it. else if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE && assigned_slot == -1) assigned_slot = i; } if (assigned_slot > -1) // We found a free spot, let's reuse it. { ACE_SELECT_REACTOR_HANDLE (assigned_slot) = handle; ACE_SELECT_REACTOR_EVENT_HANDLER (this, assigned_slot) = event_handler; } else if (this->max_handlep1_ < this->max_size_) { // Insert at the end of the active portion. ACE_SELECT_REACTOR_HANDLE (this->max_handlep1_) = handle; ACE_SELECT_REACTOR_EVENT_HANDLER (this, this->max_handlep1_) = event_handler; this->max_handlep1_++; } else { // No more room at the inn! errno = ENOMEM; return -1; } #else ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = event_handler; if (this->max_handlep1_ < handle + 1) this->max_handlep1_ = handle + 1; #endif /* ACE_WIN32 */ // Add the for this in the Select_Reactor's wait_set. this->select_reactor_.bit_ops (handle, mask, this->select_reactor_.wait_set_, ACE_Reactor::ADD_MASK); // Note the fact that we've changed the state of the , // which is used by the dispatching loop to determine whether it can // keep going or if it needs to reconsult select(). this->select_reactor_.state_changed_ = 1; return 0; } // Remove the binding of . int ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle, ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind"); size_t index; ACE_Event_Handler *eh = this->find (handle, &index); if (eh == 0) return -1; // Clear out the bits in the Select_Reactor's wait_set. this->select_reactor_.bit_ops (handle, mask, this->select_reactor_.wait_set_, ACE_Reactor::CLR_MASK); // Note the fact that we've changed the state of the , // which is used by the dispatching loop to determine whether it can // keep going or if it needs to reconsult select(). this->select_reactor_.state_changed_ = 1; // Close down the unless we've been instructed not // to. if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0) eh->handle_close (handle, mask); // If there are no longer any outstanding events on this // then we can totally shut down the Event_Handler. if (this->select_reactor_.wait_set_.rd_mask_.is_set (handle) == 0 && this->select_reactor_.wait_set_.wr_mask_.is_set (handle) == 0 && this->select_reactor_.wait_set_.ex_mask_.is_set (handle) == 0) #if defined (ACE_WIN32) { ACE_SELECT_REACTOR_HANDLE (index) = ACE_INVALID_HANDLE; ACE_SELECT_REACTOR_EVENT_HANDLER (this, index) = 0; if (this->max_handlep1_ == (int) index + 1) { // We've deleted the last entry (i.e., i + 1 == the current // size of the array), so we need to figure out the last // valid place in the array that we should consider in // subsequent searches. int i; for (i = this->max_handlep1_ - 1; i >= 0 && ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE; i--) continue; this->max_handlep1_ = i + 1; } } #else { ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = 0; if (this->max_handlep1_ == handle + 1) { // We've deleted the last entry, so we need to figure out // the last valid place in the array that is worth looking // at. ACE_HANDLE rd_max = this->select_reactor_.wait_set_.rd_mask_.max_set (); ACE_HANDLE wr_max = this->select_reactor_.wait_set_.wr_mask_.max_set (); ACE_HANDLE ex_max = this->select_reactor_.wait_set_.ex_mask_.max_set (); // Compute the maximum of three values. this->max_handlep1_ = rd_max < wr_max ? wr_max : rd_max; if (this->max_handlep1_ < ex_max) this->max_handlep1_ = ex_max; this->max_handlep1_++; } } #endif /* ACE_WIN32 */ return 0; } ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator (const ACE_Select_Reactor_Handler_Repository *s) : rep_ (s), current_ (-1) { this->advance (); } // Pass back the that hasn't been seen in the Set. // Returns 0 when all items have been seen, else 1. int ACE_Select_Reactor_Handler_Repository_Iterator::next (ACE_Event_Handler *&next_item) { int result = 1; if (this->current_ >= this->rep_->max_handlep1_) result = 0; else next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, this->current_); return result; } int ACE_Select_Reactor_Handler_Repository_Iterator::done (void) const { return this->current_ >= this->rep_->max_handlep1_; } // Move forward by one element in the set. int ACE_Select_Reactor_Handler_Repository_Iterator::advance (void) { if (this->current_ < this->rep_->max_handlep1_) this->current_++; while (this->current_ < this->rep_->max_handlep1_) if (ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, this->current_) != 0) return 1; else this->current_++; return this->current_ < this->rep_->max_handlep1_; } // Dump the state of an object. void ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const { ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump"); ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("rep_ = %u"), this->rep_)); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("current_ = %d"), this->current_)); ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } void ACE_Select_Reactor_Handler_Repository::dump (void) const { ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump"); ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("(%t) max_handlep1_ = %d, max_size_ = %d\n"), this->max_handlep1_, this->max_size_)); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("["))); ACE_Event_Handler *eh = 0; for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this); iter.next (eh) != 0; iter.advance ()) ACE_DEBUG ((LM_DEBUG, ASYS_TEXT (" (eh = %x, eh->handle_ = %d)"), eh, eh->get_handle ())); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT (" ]"))); ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator) int ACE_Select_Reactor::any_ready (ACE_Select_Reactor_Handle_Set &wait_set) { ACE_TRACE ("ACE_Select_Reactor::fill_in_ready"); #if !defined (ACE_WIN32) // Make this call signal safe. ACE_Sig_Guard sb; #endif /* ACE_WIN32 */ int number_ready = this->ready_set_.rd_mask_.num_set () + this->ready_set_.wr_mask_.num_set () + this->ready_set_.ex_mask_.num_set (); if (number_ready > 0) { wait_set.rd_mask_ = this->ready_set_.rd_mask_; wait_set.wr_mask_ = this->ready_set_.wr_mask_; wait_set.ex_mask_ = this->ready_set_.ex_mask_; this->ready_set_.rd_mask_.reset (); this->ready_set_.wr_mask_.reset (); this->ready_set_.ex_mask_.reset (); } return number_ready; } int ACE_Select_Reactor::handler_i (int signum, ACE_Event_Handler **eh) { ACE_TRACE ("ACE_Select_Reactor::handler_i"); ACE_Event_Handler *handler = this->signal_handler_->handler (signum); if (handler == 0) return -1; else if (*eh != 0) *eh = handler; return 0; } int ACE_Select_Reactor::initialized (void) { ACE_TRACE ("ACE_Select_Reactor::initialized"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, 0)); return this->initialized_; } int ACE_Select_Reactor::owner (ACE_thread_t tid, ACE_thread_t *o_id) { ACE_TRACE ("ACE_Select_Reactor::owner"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); if (o_id) *o_id = this->owner_; this->owner_ = tid; return 0; } int ACE_Select_Reactor::owner (ACE_thread_t *t_id) { ACE_TRACE ("ACE_Select_Reactor::owner"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); *t_id = this->owner_; return 0; } void ACE_Select_Reactor::requeue_position (int rp) { ACE_TRACE ("ACE_Select_Reactor::requeue_position"); ACE_MT (ACE_GUARD (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_)); #if defined (ACE_WIN32) ACE_UNUSED_ARG (rp); // Must always requeue ourselves "next" on Win32. this->requeue_position_ = 0; #else this->requeue_position_ = rp; #endif /* ACE_WIN32 */ } int ACE_Select_Reactor::requeue_position (void) { ACE_TRACE ("ACE_Select_Reactor::requeue_position"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); return this->requeue_position_; } void ACE_Select_Reactor::max_notify_iterations (int iterations) { ACE_TRACE ("ACE_Select_Reactor::max_notify_iterations"); ACE_MT (ACE_GUARD (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_)); // Must always be > 0 or < 0 to optimize the loop exit condition. if (iterations == 0) iterations = 1; this->max_notify_iterations_ = iterations; } int ACE_Select_Reactor::max_notify_iterations (void) { ACE_TRACE ("ACE_Select_Reactor::max_notify_iterations"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); return this->max_notify_iterations_; } // Enqueue ourselves into the list of waiting threads. void ACE_Select_Reactor::renew (void) { ACE_TRACE ("ACE_Select_Reactor::renew"); #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) this->token_.renew (this->requeue_position_); #endif /* defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) */ } #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) void ACE_Select_Reactor_Token::dump (void) const { ACE_TRACE ("ACE_Select_Reactor_Token::dump"); ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\n"))); ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } ACE_Select_Reactor_Token::ACE_Select_Reactor_Token (ACE_Select_Reactor &r) : select_reactor_ (&r) #if defined (ACE_SELECT_REACTOR_HAS_DEADLOCK_DETECTION) , ACE_Local_Mutex (0) // Triggers unique name by stringifying "this"... #endif /* ACE_SELECT_REACTOR_HAS_DEADLOCK_DETECTION */ { ACE_TRACE ("ACE_Select_Reactor_Token::ACE_Select_Reactor_Token"); } ACE_Select_Reactor_Token::ACE_Select_Reactor_Token (void) : select_reactor_ (0) #if defined (ACE_SELECT_REACTOR_HAS_DEADLOCK_DETECTION) , ACE_Local_Mutex (0) // Triggers unique name by stringifying "this"... #endif /* ACE_SELECT_REACTOR_HAS_DEADLOCK_DETECTION */ { ACE_TRACE ("ACE_Select_Reactor_Token::ACE_Select_Reactor_Token"); } ACE_Select_Reactor_Token::~ACE_Select_Reactor_Token (void) { ACE_TRACE ("ACE_Select_Reactor_Token::~ACE_Select_Reactor_Token"); } ACE_Select_Reactor & ACE_Select_Reactor_Token::select_reactor (void) { return *this->select_reactor_; } void ACE_Select_Reactor_Token::select_reactor (ACE_Select_Reactor &select_reactor) { this->select_reactor_ = &select_reactor; } // Used to wakeup the Select_Reactor. void ACE_Select_Reactor_Token::sleep_hook (void) { ACE_TRACE ("ACE_Select_Reactor_Token::sleep_hook"); if (this->select_reactor_->notify () == -1) ACE_ERROR ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("sleep_hook failed"))); } #endif /* ACE_MT_SAFE */ void ACE_Select_Reactor_Notify::dump (void) const { ACE_TRACE ("ACE_Select_Reactor_Notify::dump"); ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("select_reactor_ = %x"), this->select_reactor_)); this->notification_pipe_.dump (); ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } int ACE_Select_Reactor_Notify::open (ACE_Select_Reactor *r, int disable_notify_pipe) { ACE_TRACE ("ACE_Select_Reactor_Notify::open"); if (disable_notify_pipe == 0) { this->select_reactor_ = r; if (this->notification_pipe_.open () == -1) return -1; // There seems to be a Win32 bug with this... Set this into // non-blocking mode. if (ACE::set_flags (this->notification_pipe_.read_handle (), ACE_NONBLOCK) == -1) return -1; else return this->select_reactor_->register_handler (this->notification_pipe_.read_handle (), this, ACE_Event_Handler::READ_MASK); } else { this->select_reactor_ = 0; return 0; } } int ACE_Select_Reactor_Notify::close (void) { ACE_TRACE ("ACE_Select_Reactor_Notify::close"); return this->notification_pipe_.close (); } ssize_t ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, ACE_Reactor_Mask mask, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Select_Reactor_Notify::notify"); // Just consider this method a "no-op" if there's no // configured. if (this->select_reactor_ == 0) return 0; else { ACE_Notification_Buffer buffer (eh, mask); ssize_t n = ACE::send (this->notification_pipe_.write_handle (), (char *) &buffer, sizeof buffer, timeout); if (n == -1) return -1; else return 0; } } // Handles pending threads (if any) that are waiting to unblock the // Select_Reactor. int ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles, const ACE_Handle_Set &rd_mask) { ACE_TRACE ("ACE_Select_Reactor_Notify::handle_notification"); ACE_HANDLE read_handle = this->notification_pipe_.read_handle (); if (read_handle != ACE_INVALID_HANDLE && rd_mask.is_set (read_handle)) { number_of_active_handles--; return this->handle_input (read_handle); } else return 0; } // Special trick to unblock