diff options
Diffstat (limited to 'ace/WFMO_Reactor.cpp')
-rw-r--r-- | ace/WFMO_Reactor.cpp | 249 |
1 files changed, 208 insertions, 41 deletions
diff --git a/ace/WFMO_Reactor.cpp b/ace/WFMO_Reactor.cpp index 3f55ad7a693..be24cc51913 100644 --- a/ace/WFMO_Reactor.cpp +++ b/ace/WFMO_Reactor.cpp @@ -652,6 +652,15 @@ ACE_WFMO_Reactor_Handler_Repository::bind_i (int io_entry, this->handles_to_be_added_++; + int requires_reference_counting = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + + if (requires_reference_counting) + { + event_handler->add_reference (); + } + // Wake up all threads in WaitForMultipleObjects so that they can // reconsult the handle set this->wfmo_reactor_.wakeup_all_threads (); @@ -672,7 +681,8 @@ ACE_WFMO_Reactor_Handler_Repository::make_changes_in_current_infos (void) // have been schedule for deletion if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_suspended_ > 0) { - for (size_t i = 0; i < this->max_handlep1_; i++) + size_t i = 0; + while (i < this->max_handlep1_) { // This stuff is necessary here, since we should not make // the upcall until all the internal data structures have @@ -747,11 +757,29 @@ ACE_WFMO_Reactor_Handler_Repository::make_changes_in_current_infos (void) this->current_handles_[last_valid_slot] = ACE_INVALID_HANDLE; this->max_handlep1_--; } + else + { + // This current entry is not up for deletion or + // suspension. Proceed to the next entry in the current + // handles. + ++i; + } // Now that all internal structures have been updated, make // the upcall. if (event_handler != 0) - event_handler->handle_close (handle, masks); + { + int requires_reference_counting = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + + event_handler->handle_close (handle, masks); + + if (requires_reference_counting) + { + event_handler->remove_reference (); + } + } } } @@ -764,7 +792,8 @@ ACE_WFMO_Reactor_Handler_Repository::make_changes_in_suspension_infos (void) // Go through the <suspended_handle> array if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_resumed_ > 0) { - for (size_t i = 0; i < this->suspended_handles_; i++) + size_t i = 0; + while (i < this->suspended_handles_) { // This stuff is necessary here, since we should not make // the upcall until all the internal data structures have @@ -836,11 +865,29 @@ ACE_WFMO_Reactor_Handler_Repository::make_changes_in_suspension_infos (void) this->current_suspended_info_[last_valid_slot].reset (); this->suspended_handles_--; } + else + { + // This current entry is not up for deletion or + // resumption. Proceed to the next entry in the + // suspended handles. + ++i; + } // Now that all internal structures have been updated, make // the upcall. if (event_handler != 0) - event_handler->handle_close (handle, masks); + { + int requires_reference_counting = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + + event_handler->handle_close (handle, masks); + + if (requires_reference_counting) + { + event_handler->remove_reference (); + } + } } } @@ -920,7 +967,18 @@ ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos (void) // Now that all internal structures have been updated, make the // upcall. if (event_handler != 0) - event_handler->handle_close (handle, masks); + { + int requires_reference_counting = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + + event_handler->handle_close (handle, masks); + + if (requires_reference_counting) + { + event_handler->remove_reference (); + } + } } // Since all to be added handles have been taken care of, reset the @@ -1282,12 +1340,15 @@ ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle, // If this is a Winsock 1 system, the underlying event assignment will // not work, so don't try. Winsock 1 must use ACE_Select_Reactor for // reacting to socket activity. + #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0) + ACE_UNUSED_ARG (event_handle); ACE_UNUSED_ARG (io_handle); ACE_UNUSED_ARG (event_handler); ACE_UNUSED_ARG (new_masks); ACE_NOTSUP_RETURN (-1); + #else // Make sure that the <handle> is valid @@ -1361,7 +1422,9 @@ ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle, } else return -1; -#endif /* ACE_HAS_PHARLAP */ + +#endif /* ACE_HAS_WINSOCK2 || ACE_HAS_WINSOCK2 == 0 */ + } int @@ -1467,15 +1530,22 @@ ACE_WFMO_Reactor_Handler_Repository::modify_network_events_i (ACE_HANDLE io_hand return found; } -int +ACE_Event_Handler * +ACE_WFMO_Reactor_Handler_Repository::find_handler (ACE_HANDLE handle) +{ + long existing_masks_ignored = 0; + return this->handler (handle, + existing_masks_ignored); +} + +ACE_Event_Handler * ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle, - ACE_Reactor_Mask user_masks, - ACE_Event_Handler **user_event_handler) + long &existing_masks) { int found = 0; size_t i = 0; ACE_Event_Handler *event_handler = 0; - long existing_masks = 0; + existing_masks = 0; // Look for the handle first @@ -1521,7 +1591,36 @@ ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle, existing_masks = this->to_be_added_info_[i].network_events_; } - // If the handle is not found, return failure. + if (event_handler) + { + int requires_reference_counting = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + + if (requires_reference_counting) + { + event_handler->add_reference (); + } + } + + return event_handler; +} + +int +ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle, + ACE_Reactor_Mask user_masks, + ACE_Event_Handler **user_event_handler) +{ + long existing_masks = 0; + int found = 0; + + ACE_Event_Handler_var safe_event_handler = + this->handler (handle, + existing_masks); + + if (safe_event_handler.handler ()) + found = 1; + if (!found) return -1; @@ -1529,8 +1628,8 @@ ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle, // are on. if (found && ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::READ_MASK)) - if (!ACE_BIT_ENABLED (existing_masks, FD_READ) - && !ACE_BIT_ENABLED (existing_masks, FD_CLOSE)) + if (!ACE_BIT_ENABLED (existing_masks, FD_READ) && + !ACE_BIT_ENABLED (existing_masks, FD_CLOSE)) found = 0; if (found && @@ -1565,7 +1664,7 @@ ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle, if (found && user_event_handler) - *user_event_handler = event_handler; + *user_event_handler = safe_event_handler.release (); if (found) return 0; @@ -1899,14 +1998,29 @@ ACE_WFMO_Reactor::simple_dispatch_handler (DWORD slot, // siginfo_t is an ACE - specific fabrication. Constructor exists. siginfo_t sig (event_handle); - ACE_Event_Handler *eh = + ACE_Event_Handler *event_handler = this->handler_rep_.current_info ()[slot].event_handler_; + int requires_reference_counting = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + + if (requires_reference_counting) + { + event_handler->add_reference (); + } + // Upcall - if (eh->handle_signal (0, &sig) == -1) + if (event_handler->handle_signal (0, &sig) == -1) this->handler_rep_.unbind (event_handle, ACE_Event_Handler::NULL_MASK); + // Call remove_reference() if needed. + if (requires_reference_counting) + { + event_handler->remove_reference (); + } + return 0; } @@ -1941,10 +2055,30 @@ ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot, events.lNetworkEvents &= current_info.network_events_; while (events.lNetworkEvents != 0) { + ACE_Event_Handler *event_handler = + current_info.event_handler_; + + int reference_counting_required = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + + // Call add_reference() if needed. + if (reference_counting_required) + { + event_handler->add_reference (); + } + // Upcall problems |= this->upcall (current_info.event_handler_, current_info.io_handle_, events); + + // Call remove_reference() if needed. + if (reference_counting_required) + { + event_handler->remove_reference (); + } + if (this->handler_rep_.scheduled_for_deletion (slot)) break; } @@ -2276,25 +2410,32 @@ ACE_WFMO_Reactor_Notify::handle_signal (int signum, if (buffer->eh_ != 0) { + ACE_Event_Handler *event_handler = + buffer->eh_; + + int requires_reference_counting = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + int result = 0; switch (buffer->mask_) { case ACE_Event_Handler::READ_MASK: case ACE_Event_Handler::ACCEPT_MASK: - result = buffer->eh_->handle_input (ACE_INVALID_HANDLE); + result = event_handler->handle_input (ACE_INVALID_HANDLE); break; case ACE_Event_Handler::WRITE_MASK: - result = buffer->eh_->handle_output (ACE_INVALID_HANDLE); + result = event_handler->handle_output (ACE_INVALID_HANDLE); break; case ACE_Event_Handler::EXCEPT_MASK: - result = buffer->eh_->handle_exception (ACE_INVALID_HANDLE); + result = event_handler->handle_exception (ACE_INVALID_HANDLE); break; case ACE_Event_Handler::QOS_MASK: - result = buffer->eh_->handle_qos (ACE_INVALID_HANDLE); + result = event_handler->handle_qos (ACE_INVALID_HANDLE); break; case ACE_Event_Handler::GROUP_QOS_MASK: - result = buffer->eh_->handle_group_qos (ACE_INVALID_HANDLE); + result = event_handler->handle_group_qos (ACE_INVALID_HANDLE); break; default: ACE_ERROR ((LM_ERROR, @@ -2302,9 +2443,15 @@ ACE_WFMO_Reactor_Notify::handle_signal (int signum, buffer->mask_)); break; } + if (result == -1) - buffer->eh_->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::EXCEPT_MASK); + event_handler->handle_close (ACE_INVALID_HANDLE, + ACE_Event_Handler::EXCEPT_MASK); + + if (requires_reference_counting) + { + event_handler->remove_reference (); + } } // Make sure to delete the memory regardless of success or @@ -2333,11 +2480,11 @@ ACE_WFMO_Reactor_Notify::handle_signal (int signum, // thread of control. int -ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *eh, +ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask, ACE_Time_Value *timeout) { - if (eh != 0) + if (event_handler != 0) { ACE_Message_Block *mb = 0; ACE_NEW_RETURN (mb, @@ -2346,7 +2493,7 @@ ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *eh, ACE_Notification_Buffer *buffer = (ACE_Notification_Buffer *) mb->base (); - buffer->eh_ = eh; + buffer->eh_ = event_handler; buffer->mask_ = mask; // Convert from relative time to absolute time by adding the @@ -2361,6 +2508,15 @@ ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *eh, mb->release (); return -1; } + + int requires_reference_counting = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + + if (requires_reference_counting) + { + event_handler->add_reference (); + } } return this->wakeup_one_thread_.signal (); @@ -2433,22 +2589,33 @@ ACE_WFMO_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh, ACE_BIT_DISABLED (buffer->mask_, ~mask)) // the existing notification mask // is left with nothing when // applying the mask - { - mb->release (); - ++number_purged; - } + { + ACE_Event_Handler *event_handler = eh; + + int requires_reference_counting = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + + if (requires_reference_counting) + { + event_handler->remove_reference (); + } + + mb->release (); + ++number_purged; + } else - { - // To preserve it, move it to the local_queue. But first, if - // this is not a Reactor notify (it is for a - // particularhandler), and it matches the specified handler - // (or purging all), then apply the mask - if ((0 != buffer->eh_) && - (0 == eh || eh == buffer->eh_)) - ACE_CLR_BITS(buffer->mask_, mask); - if (-1 == local_queue.enqueue_head (mb)) - return -1; - } + { + // To preserve it, move it to the local_queue. But first, if + // this is not a Reactor notify (it is for a + // particularhandler), and it matches the specified handler + // (or purging all), then apply the mask + if ((0 != buffer->eh_) && + (0 == eh || eh == buffer->eh_)) + ACE_CLR_BITS(buffer->mask_, mask); + if (-1 == local_queue.enqueue_head (mb)) + return -1; + } } if (this->message_queue_.message_count ()) |