summaryrefslogtreecommitdiff
path: root/ace/Select_Reactor_Base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/Select_Reactor_Base.cpp')
-rw-r--r--ace/Select_Reactor_Base.cpp287
1 files changed, 180 insertions, 107 deletions
diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp
index f0e71bcffc3..44232c79647 100644
--- a/ace/Select_Reactor_Base.cpp
+++ b/ace/Select_Reactor_Base.cpp
@@ -123,10 +123,10 @@ int
ACE_Select_Reactor_Handler_Repository::unbind_all (void)
{
// Unbind all of the <handle, ACE_Event_Handler>s.
- for (int handle = 0;
- handle < this->max_handlep1_;
- handle++)
- this->unbind (ACE_SELECT_REACTOR_HANDLE (handle),
+ for (int slot = 0;
+ slot < this->max_handlep1_;
+ slot++)
+ this->unbind (ACE_SELECT_REACTOR_HANDLE (slot),
ACE_Event_Handler::ALL_EVENTS_MASK);
return 0;
@@ -207,25 +207,44 @@ ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
if (this->invalid_handle (handle))
return -1;
+ // Is this handle already in the Reactor?
+ int existing_handle = 0;
+
#if defined (ACE_WIN32)
+
int assigned_slot = -1;
for (ssize_t i = 0; i < this->max_handlep1_; i++)
{
- // Found it, so let's just reuse this location.
+ // If handle is already registered.
if (ACE_SELECT_REACTOR_HANDLE (i) == handle)
{
+ // Cannot use a different handler for an existing handle.
+ if (ACE_SELECT_REACTOR_EVENT_HANDLER (this, i) !=
+ event_handler)
+ return -1;
+
+ // Remember location.
assigned_slot = i;
+
+ // Remember that this handle is already registered in the
+ // Reactor.
+ existing_handle = 1;
+
+ // We can stop looking now.
break;
}
- // Here's the first free slot, so let's take it.
- else if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE
- && assigned_slot == -1)
- assigned_slot = i;
+ else
+ // Here's the first free slot, so let's take it.
+ if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE &&
+ assigned_slot == -1)
+ {
+ assigned_slot = i;
+ }
}
if (assigned_slot > -1)
- // We found a free spot, let's reuse it.
+ // We found a spot.
{
ACE_SELECT_REACTOR_HANDLE (assigned_slot) = handle;
ACE_SELECT_REACTOR_EVENT_HANDLER (this, assigned_slot) = event_handler;
@@ -243,11 +262,29 @@ ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
errno = ENOMEM;
return -1;
}
+
#else
+
+ // Check if this handle is already registered.
+ ACE_Event_Handler *current_handler =
+ ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle);
+
+ if (current_handler)
+ {
+ // Cannot use a different handler for an existing handle.
+ if (current_handler != event_handler)
+ return -1;
+
+ // Remember that this handle is already registered in the
+ // Reactor.
+ existing_handle = 1;
+ }
+
ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = event_handler;
if (this->max_handlep1_ < handle + 1)
this->max_handlep1_ = handle + 1;
+
#endif /* ACE_WIN32 */
if (this->select_reactor_.is_suspended_i (handle))
@@ -270,15 +307,18 @@ ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
this->select_reactor_.state_changed_ = 1;
}
- /*
- // @@NOTE: We used to do this in earlier versions of ACE+TAO. But
- // this is totally wrong..
- // Clear any suspend masks for it too.
- this->select_reactor_.bit_ops (handle,
- mask,
- this->select_reactor_.suspend_set_,
- ACE_Reactor::CLR_MASK);
- */
+ // If new entry, call add_reference() if needed.
+ if (!existing_handle)
+ {
+ int requires_reference_counting =
+ event_handler->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
+
+ if (requires_reference_counting)
+ {
+ event_handler->add_reference ();
+ }
+ }
return 0;
}
@@ -292,9 +332,9 @@ ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle,
ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind");
size_t slot;
- ACE_Event_Handler *eh = this->find (handle, &slot);
+ ACE_Event_Handler *event_handler = this->find (handle, &slot);
- if (eh == 0)
+ if (event_handler == 0)
return -1;
// Clear out the <mask> bits in the Select_Reactor's wait_set.
@@ -314,11 +354,6 @@ ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle,
// keep going or if it needs to reconsult select().
this->select_reactor_.state_changed_ = 1;
- // Close down the <Event_Handler> unless we've been instructed not
- // to.
- if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
- eh->handle_close (handle, mask);
-
// If there are no longer any outstanding events on this <handle>
// then we can totally shut down the Event_Handler.
@@ -331,14 +366,19 @@ ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle,
|| this->select_reactor_.suspend_set_.wr_mask_.is_set (handle)
|| this->select_reactor_.suspend_set_.ex_mask_.is_set (handle));
- if (!has_any_wait_mask
- && !has_any_suspend_mask
- && (this->find (handle, &slot) == eh))
-#if defined (ACE_WIN32)
+ int complete_removal = 0;
+
+ if (!has_any_wait_mask && !has_any_suspend_mask)
{
- ACE_SELECT_REACTOR_HANDLE (slot) = ACE_INVALID_HANDLE;
+ // The handle has been completed removed.
+ complete_removal = 1;
+
ACE_SELECT_REACTOR_EVENT_HANDLER (this, slot) = 0;
+#if defined (ACE_WIN32)
+
+ ACE_SELECT_REACTOR_HANDLE (slot) = ACE_INVALID_HANDLE;
+
if (this->max_handlep1_ == (int) slot + 1)
{
// We've deleted the last entry (i.e., i + 1 == the current
@@ -355,43 +395,60 @@ ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle,
this->max_handlep1_ = i + 1;
}
- }
+
#else
- {
- ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = 0;
- if (this->max_handlep1_ == handle + 1)
- {
- // We've deleted the last entry, so we need to figure out
- // the last valid place in the array that is worth looking
- // at.
- ACE_HANDLE wait_rd_max = this->select_reactor_.wait_set_.rd_mask_.max_set ();
- ACE_HANDLE wait_wr_max = this->select_reactor_.wait_set_.wr_mask_.max_set ();
- ACE_HANDLE wait_ex_max = this->select_reactor_.wait_set_.ex_mask_.max_set ();
-
- ACE_HANDLE suspend_rd_max = this->select_reactor_.suspend_set_.rd_mask_.max_set ();
- ACE_HANDLE suspend_wr_max = this->select_reactor_.suspend_set_.wr_mask_.max_set ();
- ACE_HANDLE suspend_ex_max = this->select_reactor_.suspend_set_.ex_mask_.max_set ();
-
- // Compute the maximum of six values.
- this->max_handlep1_ = wait_rd_max;
- if (this->max_handlep1_ < wait_wr_max)
- this->max_handlep1_ = wait_wr_max;
- if (this->max_handlep1_ < wait_ex_max)
- this->max_handlep1_ = wait_ex_max;
-
- if (this->max_handlep1_ < suspend_rd_max)
- this->max_handlep1_ = suspend_rd_max;
- if (this->max_handlep1_ < suspend_wr_max)
- this->max_handlep1_ = suspend_wr_max;
- if (this->max_handlep1_ < suspend_ex_max)
- this->max_handlep1_ = suspend_ex_max;
-
- this->max_handlep1_++;
- }
- }
+ if (this->max_handlep1_ == handle + 1)
+ {
+ // We've deleted the last entry, so we need to figure out
+ // the last valid place in the array that is worth looking
+ // at.
+ ACE_HANDLE wait_rd_max = this->select_reactor_.wait_set_.rd_mask_.max_set ();
+ ACE_HANDLE wait_wr_max = this->select_reactor_.wait_set_.wr_mask_.max_set ();
+ ACE_HANDLE wait_ex_max = this->select_reactor_.wait_set_.ex_mask_.max_set ();
+
+ ACE_HANDLE suspend_rd_max = this->select_reactor_.suspend_set_.rd_mask_.max_set ();
+ ACE_HANDLE suspend_wr_max = this->select_reactor_.suspend_set_.wr_mask_.max_set ();
+ ACE_HANDLE suspend_ex_max = this->select_reactor_.suspend_set_.ex_mask_.max_set ();
+
+ // Compute the maximum of six values.
+ this->max_handlep1_ = wait_rd_max;
+ if (this->max_handlep1_ < wait_wr_max)
+ this->max_handlep1_ = wait_wr_max;
+ if (this->max_handlep1_ < wait_ex_max)
+ this->max_handlep1_ = wait_ex_max;
+
+ if (this->max_handlep1_ < suspend_rd_max)
+ this->max_handlep1_ = suspend_rd_max;
+ if (this->max_handlep1_ < suspend_wr_max)
+ this->max_handlep1_ = suspend_wr_max;
+ if (this->max_handlep1_ < suspend_ex_max)
+ this->max_handlep1_ = suspend_ex_max;
+
+ this->max_handlep1_++;
+ }
+
#endif /* ACE_WIN32 */
+ }
+
+ int requires_reference_counting =
+ event_handler->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
+
+ // Close down the <Event_Handler> unless we've been instructed not
+ // to.
+ if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
+ event_handler->handle_close (handle, mask);
+
+ // Call remove_reference() if the removal is complete and reference
+ // counting is needed.
+ if (complete_removal &&
+ requires_reference_counting)
+ {
+ event_handler->remove_reference ();
+ }
+
return 0;
}
@@ -466,13 +523,13 @@ ACE_Select_Reactor_Handler_Repository::dump (void) const
this->max_handlep1_, this->max_size_));
ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("[")));
- ACE_Event_Handler *eh = 0;
+ ACE_Event_Handler *event_handler = 0;
for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this);
- iter.next (eh) != 0;
+ iter.next (event_handler) != 0;
iter.advance ())
- ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" (eh = %x, eh->handle_ = %d)"),
- eh, eh->get_handle ()));
+ ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" (event_handler = %x, event_handler->handle_ = %d)"),
+ event_handler, event_handler->get_handle ()));
ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" ]")));
ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
@@ -549,6 +606,18 @@ ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
ACE_LIB_TEXT ("%p\n"),
ACE_LIB_TEXT ("enqueue_head")),
-1);
+
+ ACE_Event_Handler *event_handler = eh;
+
+ int requires_reference_counting =
+ event_handler->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
+
+ if (requires_reference_counting)
+ {
+ event_handler->remove_reference ();
+ }
+
++number_purged;
}
else
@@ -696,7 +765,7 @@ ACE_Select_Reactor_Notify::close (void)
}
int
-ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
+ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
ACE_Reactor_Mask mask,
ACE_Time_Value *timeout)
{
@@ -707,7 +776,22 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
if (this->select_reactor_ == 0)
return 0;
- ACE_Notification_Buffer buffer (eh, mask);
+ ACE_Event_Handler_var safe_handler;
+
+ if (event_handler)
+ {
+ int requires_reference_counting =
+ event_handler->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
+
+ if (requires_reference_counting)
+ {
+ event_handler->add_reference ();
+ safe_handler = event_handler;
+ }
+ }
+
+ ACE_Notification_Buffer buffer (event_handler, mask);
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
// Artificial scope to limit the duration of the mutex.
@@ -765,6 +849,9 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
if (n == -1)
return -1;
+ // No failures.
+ safe_handler.release ();
+
return 0;
}
@@ -866,7 +953,9 @@ int
ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
{
int result = 0;
+
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+
// Dispatch all messages that are in the <notify_queue_>.
{
// We acquire the lock in a block to make sure we're not
@@ -890,35 +979,8 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
-1);
}
- // If eh == 0 then another thread is unblocking the
- // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
- // internal structures. Otherwise, we need to dispatch the
- // appropriate handle_* method on the <ACE_Event_Handler>
- // pointer we've been passed.
- if (buffer.eh_ != 0)
- {
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
- switch (buffer.mask_)
- {
- case ACE_Event_Handler::READ_MASK:
- case ACE_Event_Handler::ACCEPT_MASK:
- result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::WRITE_MASK:
- result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::EXCEPT_MASK:
- result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
- break;
- default:
- // Should we bail out if we get an invalid mask?
- ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_));
- }
- if (result == -1)
- buffer.eh_->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::EXCEPT_MASK);
- }
-#else
// If eh == 0 then another thread is unblocking the
// <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
// internal structures. Otherwise, we need to dispatch the
@@ -926,23 +988,30 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
// pointer we've been passed.
if (buffer.eh_ != 0)
{
+ ACE_Event_Handler *event_handler =
+ buffer.eh_;
+
+ int requires_reference_counting =
+ event_handler->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
+
switch (buffer.mask_)
{
case ACE_Event_Handler::READ_MASK:
case ACE_Event_Handler::ACCEPT_MASK:
- result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
+ result = event_handler->handle_input (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::WRITE_MASK:
- result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
+ result = event_handler->handle_output (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::EXCEPT_MASK:
- result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
+ result = event_handler->handle_exception (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::QOS_MASK:
- result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE);
+ result = event_handler->handle_qos (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::GROUP_QOS_MASK:
- result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE);
+ result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
break;
default:
// Should we bail out if we get an invalid mask?
@@ -950,12 +1019,16 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
ACE_LIB_TEXT ("invalid mask = %d\n"),
buffer.mask_));
}
+
if (result == -1)
- buffer.eh_->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::EXCEPT_MASK);
- }
+ event_handler->handle_close (ACE_INVALID_HANDLE,
+ ACE_Event_Handler::EXCEPT_MASK);
-#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+ if (requires_reference_counting)
+ {
+ event_handler->remove_reference ();
+ }
+ }
return 1;
}