diff options
Diffstat (limited to 'ace/Select_Reactor_Base.cpp')
-rw-r--r-- | ace/Select_Reactor_Base.cpp | 287 |
1 files changed, 180 insertions, 107 deletions
diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp index f0e71bcffc3..44232c79647 100644 --- a/ace/Select_Reactor_Base.cpp +++ b/ace/Select_Reactor_Base.cpp @@ -123,10 +123,10 @@ int ACE_Select_Reactor_Handler_Repository::unbind_all (void) { // Unbind all of the <handle, ACE_Event_Handler>s. - for (int handle = 0; - handle < this->max_handlep1_; - handle++) - this->unbind (ACE_SELECT_REACTOR_HANDLE (handle), + for (int slot = 0; + slot < this->max_handlep1_; + slot++) + this->unbind (ACE_SELECT_REACTOR_HANDLE (slot), ACE_Event_Handler::ALL_EVENTS_MASK); return 0; @@ -207,25 +207,44 @@ ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle, if (this->invalid_handle (handle)) return -1; + // Is this handle already in the Reactor? + int existing_handle = 0; + #if defined (ACE_WIN32) + int assigned_slot = -1; for (ssize_t i = 0; i < this->max_handlep1_; i++) { - // Found it, so let's just reuse this location. + // If handle is already registered. if (ACE_SELECT_REACTOR_HANDLE (i) == handle) { + // Cannot use a different handler for an existing handle. + if (ACE_SELECT_REACTOR_EVENT_HANDLER (this, i) != + event_handler) + return -1; + + // Remember location. assigned_slot = i; + + // Remember that this handle is already registered in the + // Reactor. + existing_handle = 1; + + // We can stop looking now. break; } - // Here's the first free slot, so let's take it. - else if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE - && assigned_slot == -1) - assigned_slot = i; + else + // Here's the first free slot, so let's take it. + if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE && + assigned_slot == -1) + { + assigned_slot = i; + } } if (assigned_slot > -1) - // We found a free spot, let's reuse it. + // We found a spot. { ACE_SELECT_REACTOR_HANDLE (assigned_slot) = handle; ACE_SELECT_REACTOR_EVENT_HANDLER (this, assigned_slot) = event_handler; @@ -243,11 +262,29 @@ ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle, errno = ENOMEM; return -1; } + #else + + // Check if this handle is already registered. + ACE_Event_Handler *current_handler = + ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle); + + if (current_handler) + { + // Cannot use a different handler for an existing handle. + if (current_handler != event_handler) + return -1; + + // Remember that this handle is already registered in the + // Reactor. + existing_handle = 1; + } + ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = event_handler; if (this->max_handlep1_ < handle + 1) this->max_handlep1_ = handle + 1; + #endif /* ACE_WIN32 */ if (this->select_reactor_.is_suspended_i (handle)) @@ -270,15 +307,18 @@ ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle, this->select_reactor_.state_changed_ = 1; } - /* - // @@NOTE: We used to do this in earlier versions of ACE+TAO. But - // this is totally wrong.. - // Clear any suspend masks for it too. - this->select_reactor_.bit_ops (handle, - mask, - this->select_reactor_.suspend_set_, - ACE_Reactor::CLR_MASK); - */ + // If new entry, call add_reference() if needed. + if (!existing_handle) + { + int requires_reference_counting = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + + if (requires_reference_counting) + { + event_handler->add_reference (); + } + } return 0; } @@ -292,9 +332,9 @@ ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle, ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind"); size_t slot; - ACE_Event_Handler *eh = this->find (handle, &slot); + ACE_Event_Handler *event_handler = this->find (handle, &slot); - if (eh == 0) + if (event_handler == 0) return -1; // Clear out the <mask> bits in the Select_Reactor's wait_set. @@ -314,11 +354,6 @@ ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle, // keep going or if it needs to reconsult select(). this->select_reactor_.state_changed_ = 1; - // Close down the <Event_Handler> unless we've been instructed not - // to. - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0) - eh->handle_close (handle, mask); - // If there are no longer any outstanding events on this <handle> // then we can totally shut down the Event_Handler. @@ -331,14 +366,19 @@ ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle, || this->select_reactor_.suspend_set_.wr_mask_.is_set (handle) || this->select_reactor_.suspend_set_.ex_mask_.is_set (handle)); - if (!has_any_wait_mask - && !has_any_suspend_mask - && (this->find (handle, &slot) == eh)) -#if defined (ACE_WIN32) + int complete_removal = 0; + + if (!has_any_wait_mask && !has_any_suspend_mask) { - ACE_SELECT_REACTOR_HANDLE (slot) = ACE_INVALID_HANDLE; + // The handle has been completed removed. + complete_removal = 1; + ACE_SELECT_REACTOR_EVENT_HANDLER (this, slot) = 0; +#if defined (ACE_WIN32) + + ACE_SELECT_REACTOR_HANDLE (slot) = ACE_INVALID_HANDLE; + if (this->max_handlep1_ == (int) slot + 1) { // We've deleted the last entry (i.e., i + 1 == the current @@ -355,43 +395,60 @@ ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle, this->max_handlep1_ = i + 1; } - } + #else - { - ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = 0; - if (this->max_handlep1_ == handle + 1) - { - // We've deleted the last entry, so we need to figure out - // the last valid place in the array that is worth looking - // at. - ACE_HANDLE wait_rd_max = this->select_reactor_.wait_set_.rd_mask_.max_set (); - ACE_HANDLE wait_wr_max = this->select_reactor_.wait_set_.wr_mask_.max_set (); - ACE_HANDLE wait_ex_max = this->select_reactor_.wait_set_.ex_mask_.max_set (); - - ACE_HANDLE suspend_rd_max = this->select_reactor_.suspend_set_.rd_mask_.max_set (); - ACE_HANDLE suspend_wr_max = this->select_reactor_.suspend_set_.wr_mask_.max_set (); - ACE_HANDLE suspend_ex_max = this->select_reactor_.suspend_set_.ex_mask_.max_set (); - - // Compute the maximum of six values. - this->max_handlep1_ = wait_rd_max; - if (this->max_handlep1_ < wait_wr_max) - this->max_handlep1_ = wait_wr_max; - if (this->max_handlep1_ < wait_ex_max) - this->max_handlep1_ = wait_ex_max; - - if (this->max_handlep1_ < suspend_rd_max) - this->max_handlep1_ = suspend_rd_max; - if (this->max_handlep1_ < suspend_wr_max) - this->max_handlep1_ = suspend_wr_max; - if (this->max_handlep1_ < suspend_ex_max) - this->max_handlep1_ = suspend_ex_max; - - this->max_handlep1_++; - } - } + if (this->max_handlep1_ == handle + 1) + { + // We've deleted the last entry, so we need to figure out + // the last valid place in the array that is worth looking + // at. + ACE_HANDLE wait_rd_max = this->select_reactor_.wait_set_.rd_mask_.max_set (); + ACE_HANDLE wait_wr_max = this->select_reactor_.wait_set_.wr_mask_.max_set (); + ACE_HANDLE wait_ex_max = this->select_reactor_.wait_set_.ex_mask_.max_set (); + + ACE_HANDLE suspend_rd_max = this->select_reactor_.suspend_set_.rd_mask_.max_set (); + ACE_HANDLE suspend_wr_max = this->select_reactor_.suspend_set_.wr_mask_.max_set (); + ACE_HANDLE suspend_ex_max = this->select_reactor_.suspend_set_.ex_mask_.max_set (); + + // Compute the maximum of six values. + this->max_handlep1_ = wait_rd_max; + if (this->max_handlep1_ < wait_wr_max) + this->max_handlep1_ = wait_wr_max; + if (this->max_handlep1_ < wait_ex_max) + this->max_handlep1_ = wait_ex_max; + + if (this->max_handlep1_ < suspend_rd_max) + this->max_handlep1_ = suspend_rd_max; + if (this->max_handlep1_ < suspend_wr_max) + this->max_handlep1_ = suspend_wr_max; + if (this->max_handlep1_ < suspend_ex_max) + this->max_handlep1_ = suspend_ex_max; + + this->max_handlep1_++; + } + #endif /* ACE_WIN32 */ + } + + int requires_reference_counting = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + + // Close down the <Event_Handler> unless we've been instructed not + // to. + if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0) + event_handler->handle_close (handle, mask); + + // Call remove_reference() if the removal is complete and reference + // counting is needed. + if (complete_removal && + requires_reference_counting) + { + event_handler->remove_reference (); + } + return 0; } @@ -466,13 +523,13 @@ ACE_Select_Reactor_Handler_Repository::dump (void) const this->max_handlep1_, this->max_size_)); ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("["))); - ACE_Event_Handler *eh = 0; + ACE_Event_Handler *event_handler = 0; for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this); - iter.next (eh) != 0; + iter.next (event_handler) != 0; iter.advance ()) - ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" (eh = %x, eh->handle_ = %d)"), - eh, eh->get_handle ())); + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" (event_handler = %x, event_handler->handle_ = %d)"), + event_handler, event_handler->get_handle ())); ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" ]"))); ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); @@ -549,6 +606,18 @@ ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh, ACE_LIB_TEXT ("%p\n"), ACE_LIB_TEXT ("enqueue_head")), -1); + + ACE_Event_Handler *event_handler = eh; + + int requires_reference_counting = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + + if (requires_reference_counting) + { + event_handler->remove_reference (); + } + ++number_purged; } else @@ -696,7 +765,7 @@ ACE_Select_Reactor_Notify::close (void) } int -ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, +ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask, ACE_Time_Value *timeout) { @@ -707,7 +776,22 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, if (this->select_reactor_ == 0) return 0; - ACE_Notification_Buffer buffer (eh, mask); + ACE_Event_Handler_var safe_handler; + + if (event_handler) + { + int requires_reference_counting = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + + if (requires_reference_counting) + { + event_handler->add_reference (); + safe_handler = event_handler; + } + } + + ACE_Notification_Buffer buffer (event_handler, mask); #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) // Artificial scope to limit the duration of the mutex. @@ -765,6 +849,9 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, if (n == -1) return -1; + // No failures. + safe_handler.release (); + return 0; } @@ -866,7 +953,9 @@ int ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) { int result = 0; + #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + // Dispatch all messages that are in the <notify_queue_>. { // We acquire the lock in a block to make sure we're not @@ -890,35 +979,8 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) -1); } - // If eh == 0 then another thread is unblocking the - // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s - // internal structures. Otherwise, we need to dispatch the - // appropriate handle_* method on the <ACE_Event_Handler> - // pointer we've been passed. - if (buffer.eh_ != 0) - { +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ - switch (buffer.mask_) - { - case ACE_Event_Handler::READ_MASK: - case ACE_Event_Handler::ACCEPT_MASK: - result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::WRITE_MASK: - result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::EXCEPT_MASK: - result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); - break; - default: - // Should we bail out if we get an invalid mask? - ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_)); - } - if (result == -1) - buffer.eh_->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::EXCEPT_MASK); - } -#else // If eh == 0 then another thread is unblocking the // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s // internal structures. Otherwise, we need to dispatch the @@ -926,23 +988,30 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) // pointer we've been passed. if (buffer.eh_ != 0) { + ACE_Event_Handler *event_handler = + buffer.eh_; + + int requires_reference_counting = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + switch (buffer.mask_) { case ACE_Event_Handler::READ_MASK: case ACE_Event_Handler::ACCEPT_MASK: - result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); + result = event_handler->handle_input (ACE_INVALID_HANDLE); break; case ACE_Event_Handler::WRITE_MASK: - result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); + result = event_handler->handle_output (ACE_INVALID_HANDLE); break; case ACE_Event_Handler::EXCEPT_MASK: - result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); + result = event_handler->handle_exception (ACE_INVALID_HANDLE); break; case ACE_Event_Handler::QOS_MASK: - result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE); + result = event_handler->handle_qos (ACE_INVALID_HANDLE); break; case ACE_Event_Handler::GROUP_QOS_MASK: - result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE); + result = event_handler->handle_group_qos (ACE_INVALID_HANDLE); break; default: // Should we bail out if we get an invalid mask? @@ -950,12 +1019,16 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_)); } + if (result == -1) - buffer.eh_->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::EXCEPT_MASK); - } + event_handler->handle_close (ACE_INVALID_HANDLE, + ACE_Event_Handler::EXCEPT_MASK); -#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + if (requires_reference_counting) + { + event_handler->remove_reference (); + } + } return 1; } |