summaryrefslogtreecommitdiff
path: root/ace/WFMO_Reactor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/WFMO_Reactor.cpp')
-rw-r--r--ace/WFMO_Reactor.cpp249
1 files changed, 208 insertions, 41 deletions
diff --git a/ace/WFMO_Reactor.cpp b/ace/WFMO_Reactor.cpp
index 3f55ad7a693..be24cc51913 100644
--- a/ace/WFMO_Reactor.cpp
+++ b/ace/WFMO_Reactor.cpp
@@ -652,6 +652,15 @@ ACE_WFMO_Reactor_Handler_Repository::bind_i (int io_entry,
this->handles_to_be_added_++;
+ int requires_reference_counting =
+ event_handler->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
+
+ if (requires_reference_counting)
+ {
+ event_handler->add_reference ();
+ }
+
// Wake up all threads in WaitForMultipleObjects so that they can
// reconsult the handle set
this->wfmo_reactor_.wakeup_all_threads ();
@@ -672,7 +681,8 @@ ACE_WFMO_Reactor_Handler_Repository::make_changes_in_current_infos (void)
// have been schedule for deletion
if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_suspended_ > 0)
{
- for (size_t i = 0; i < this->max_handlep1_; i++)
+ size_t i = 0;
+ while (i < this->max_handlep1_)
{
// This stuff is necessary here, since we should not make
// the upcall until all the internal data structures have
@@ -747,11 +757,29 @@ ACE_WFMO_Reactor_Handler_Repository::make_changes_in_current_infos (void)
this->current_handles_[last_valid_slot] = ACE_INVALID_HANDLE;
this->max_handlep1_--;
}
+ else
+ {
+ // This current entry is not up for deletion or
+ // suspension. Proceed to the next entry in the current
+ // handles.
+ ++i;
+ }
// Now that all internal structures have been updated, make
// the upcall.
if (event_handler != 0)
- event_handler->handle_close (handle, masks);
+ {
+ int requires_reference_counting =
+ event_handler->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
+
+ event_handler->handle_close (handle, masks);
+
+ if (requires_reference_counting)
+ {
+ event_handler->remove_reference ();
+ }
+ }
}
}
@@ -764,7 +792,8 @@ ACE_WFMO_Reactor_Handler_Repository::make_changes_in_suspension_infos (void)
// Go through the <suspended_handle> array
if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_resumed_ > 0)
{
- for (size_t i = 0; i < this->suspended_handles_; i++)
+ size_t i = 0;
+ while (i < this->suspended_handles_)
{
// This stuff is necessary here, since we should not make
// the upcall until all the internal data structures have
@@ -836,11 +865,29 @@ ACE_WFMO_Reactor_Handler_Repository::make_changes_in_suspension_infos (void)
this->current_suspended_info_[last_valid_slot].reset ();
this->suspended_handles_--;
}
+ else
+ {
+ // This current entry is not up for deletion or
+ // resumption. Proceed to the next entry in the
+ // suspended handles.
+ ++i;
+ }
// Now that all internal structures have been updated, make
// the upcall.
if (event_handler != 0)
- event_handler->handle_close (handle, masks);
+ {
+ int requires_reference_counting =
+ event_handler->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
+
+ event_handler->handle_close (handle, masks);
+
+ if (requires_reference_counting)
+ {
+ event_handler->remove_reference ();
+ }
+ }
}
}
@@ -920,7 +967,18 @@ ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos (void)
// Now that all internal structures have been updated, make the
// upcall.
if (event_handler != 0)
- event_handler->handle_close (handle, masks);
+ {
+ int requires_reference_counting =
+ event_handler->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
+
+ event_handler->handle_close (handle, masks);
+
+ if (requires_reference_counting)
+ {
+ event_handler->remove_reference ();
+ }
+ }
}
// Since all to be added handles have been taken care of, reset the
@@ -1282,12 +1340,15 @@ ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle,
// If this is a Winsock 1 system, the underlying event assignment will
// not work, so don't try. Winsock 1 must use ACE_Select_Reactor for
// reacting to socket activity.
+
#if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
+
ACE_UNUSED_ARG (event_handle);
ACE_UNUSED_ARG (io_handle);
ACE_UNUSED_ARG (event_handler);
ACE_UNUSED_ARG (new_masks);
ACE_NOTSUP_RETURN (-1);
+
#else
// Make sure that the <handle> is valid
@@ -1361,7 +1422,9 @@ ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle,
}
else
return -1;
-#endif /* ACE_HAS_PHARLAP */
+
+#endif /* ACE_HAS_WINSOCK2 || ACE_HAS_WINSOCK2 == 0 */
+
}
int
@@ -1467,15 +1530,22 @@ ACE_WFMO_Reactor_Handler_Repository::modify_network_events_i (ACE_HANDLE io_hand
return found;
}
-int
+ACE_Event_Handler *
+ACE_WFMO_Reactor_Handler_Repository::find_handler (ACE_HANDLE handle)
+{
+ long existing_masks_ignored = 0;
+ return this->handler (handle,
+ existing_masks_ignored);
+}
+
+ACE_Event_Handler *
ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
- ACE_Reactor_Mask user_masks,
- ACE_Event_Handler **user_event_handler)
+ long &existing_masks)
{
int found = 0;
size_t i = 0;
ACE_Event_Handler *event_handler = 0;
- long existing_masks = 0;
+ existing_masks = 0;
// Look for the handle first
@@ -1521,7 +1591,36 @@ ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
existing_masks = this->to_be_added_info_[i].network_events_;
}
- // If the handle is not found, return failure.
+ if (event_handler)
+ {
+ int requires_reference_counting =
+ event_handler->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
+
+ if (requires_reference_counting)
+ {
+ event_handler->add_reference ();
+ }
+ }
+
+ return event_handler;
+}
+
+int
+ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
+ ACE_Reactor_Mask user_masks,
+ ACE_Event_Handler **user_event_handler)
+{
+ long existing_masks = 0;
+ int found = 0;
+
+ ACE_Event_Handler_var safe_event_handler =
+ this->handler (handle,
+ existing_masks);
+
+ if (safe_event_handler.handler ())
+ found = 1;
+
if (!found)
return -1;
@@ -1529,8 +1628,8 @@ ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
// are on.
if (found &&
ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::READ_MASK))
- if (!ACE_BIT_ENABLED (existing_masks, FD_READ)
- && !ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
+ if (!ACE_BIT_ENABLED (existing_masks, FD_READ) &&
+ !ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
found = 0;
if (found &&
@@ -1565,7 +1664,7 @@ ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
if (found &&
user_event_handler)
- *user_event_handler = event_handler;
+ *user_event_handler = safe_event_handler.release ();
if (found)
return 0;
@@ -1899,14 +1998,29 @@ ACE_WFMO_Reactor::simple_dispatch_handler (DWORD slot,
// siginfo_t is an ACE - specific fabrication. Constructor exists.
siginfo_t sig (event_handle);
- ACE_Event_Handler *eh =
+ ACE_Event_Handler *event_handler =
this->handler_rep_.current_info ()[slot].event_handler_;
+ int requires_reference_counting =
+ event_handler->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
+
+ if (requires_reference_counting)
+ {
+ event_handler->add_reference ();
+ }
+
// Upcall
- if (eh->handle_signal (0, &sig) == -1)
+ if (event_handler->handle_signal (0, &sig) == -1)
this->handler_rep_.unbind (event_handle,
ACE_Event_Handler::NULL_MASK);
+ // Call remove_reference() if needed.
+ if (requires_reference_counting)
+ {
+ event_handler->remove_reference ();
+ }
+
return 0;
}
@@ -1941,10 +2055,30 @@ ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
events.lNetworkEvents &= current_info.network_events_;
while (events.lNetworkEvents != 0)
{
+ ACE_Event_Handler *event_handler =
+ current_info.event_handler_;
+
+ int reference_counting_required =
+ event_handler->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
+
+ // Call add_reference() if needed.
+ if (reference_counting_required)
+ {
+ event_handler->add_reference ();
+ }
+
// Upcall
problems |= this->upcall (current_info.event_handler_,
current_info.io_handle_,
events);
+
+ // Call remove_reference() if needed.
+ if (reference_counting_required)
+ {
+ event_handler->remove_reference ();
+ }
+
if (this->handler_rep_.scheduled_for_deletion (slot))
break;
}
@@ -2276,25 +2410,32 @@ ACE_WFMO_Reactor_Notify::handle_signal (int signum,
if (buffer->eh_ != 0)
{
+ ACE_Event_Handler *event_handler =
+ buffer->eh_;
+
+ int requires_reference_counting =
+ event_handler->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
+
int result = 0;
switch (buffer->mask_)
{
case ACE_Event_Handler::READ_MASK:
case ACE_Event_Handler::ACCEPT_MASK:
- result = buffer->eh_->handle_input (ACE_INVALID_HANDLE);
+ result = event_handler->handle_input (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::WRITE_MASK:
- result = buffer->eh_->handle_output (ACE_INVALID_HANDLE);
+ result = event_handler->handle_output (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::EXCEPT_MASK:
- result = buffer->eh_->handle_exception (ACE_INVALID_HANDLE);
+ result = event_handler->handle_exception (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::QOS_MASK:
- result = buffer->eh_->handle_qos (ACE_INVALID_HANDLE);
+ result = event_handler->handle_qos (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::GROUP_QOS_MASK:
- result = buffer->eh_->handle_group_qos (ACE_INVALID_HANDLE);
+ result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
break;
default:
ACE_ERROR ((LM_ERROR,
@@ -2302,9 +2443,15 @@ ACE_WFMO_Reactor_Notify::handle_signal (int signum,
buffer->mask_));
break;
}
+
if (result == -1)
- buffer->eh_->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::EXCEPT_MASK);
+ event_handler->handle_close (ACE_INVALID_HANDLE,
+ ACE_Event_Handler::EXCEPT_MASK);
+
+ if (requires_reference_counting)
+ {
+ event_handler->remove_reference ();
+ }
}
// Make sure to delete the memory regardless of success or
@@ -2333,11 +2480,11 @@ ACE_WFMO_Reactor_Notify::handle_signal (int signum,
// thread of control.
int
-ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *eh,
+ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
ACE_Reactor_Mask mask,
ACE_Time_Value *timeout)
{
- if (eh != 0)
+ if (event_handler != 0)
{
ACE_Message_Block *mb = 0;
ACE_NEW_RETURN (mb,
@@ -2346,7 +2493,7 @@ ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *eh,
ACE_Notification_Buffer *buffer =
(ACE_Notification_Buffer *) mb->base ();
- buffer->eh_ = eh;
+ buffer->eh_ = event_handler;
buffer->mask_ = mask;
// Convert from relative time to absolute time by adding the
@@ -2361,6 +2508,15 @@ ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *eh,
mb->release ();
return -1;
}
+
+ int requires_reference_counting =
+ event_handler->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
+
+ if (requires_reference_counting)
+ {
+ event_handler->add_reference ();
+ }
}
return this->wakeup_one_thread_.signal ();
@@ -2433,22 +2589,33 @@ ACE_WFMO_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
ACE_BIT_DISABLED (buffer->mask_, ~mask)) // the existing notification mask
// is left with nothing when
// applying the mask
- {
- mb->release ();
- ++number_purged;
- }
+ {
+ ACE_Event_Handler *event_handler = eh;
+
+ int requires_reference_counting =
+ event_handler->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
+
+ if (requires_reference_counting)
+ {
+ event_handler->remove_reference ();
+ }
+
+ mb->release ();
+ ++number_purged;
+ }
else
- {
- // To preserve it, move it to the local_queue. But first, if
- // this is not a Reactor notify (it is for a
- // particularhandler), and it matches the specified handler
- // (or purging all), then apply the mask
- if ((0 != buffer->eh_) &&
- (0 == eh || eh == buffer->eh_))
- ACE_CLR_BITS(buffer->mask_, mask);
- if (-1 == local_queue.enqueue_head (mb))
- return -1;
- }
+ {
+ // To preserve it, move it to the local_queue. But first, if
+ // this is not a Reactor notify (it is for a
+ // particularhandler), and it matches the specified handler
+ // (or purging all), then apply the mask
+ if ((0 != buffer->eh_) &&
+ (0 == eh || eh == buffer->eh_))
+ ACE_CLR_BITS(buffer->mask_, mask);
+ if (-1 == local_queue.enqueue_head (mb))
+ return -1;
+ }
}
if (this->message_queue_.message_count ())