summaryrefslogtreecommitdiff
path: root/ace/TP_Reactor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/TP_Reactor.cpp')
-rw-r--r--ace/TP_Reactor.cpp321
1 files changed, 52 insertions, 269 deletions
diff --git a/ace/TP_Reactor.cpp b/ace/TP_Reactor.cpp
index 16822dc36e7..0d6da2830fa 100644
--- a/ace/TP_Reactor.cpp
+++ b/ace/TP_Reactor.cpp
@@ -1,6 +1,5 @@
// $Id$
-
#include "ace/TP_Reactor.h"
#include "ace/Reactor.h"
#include "ace/Thread.h"
@@ -11,7 +10,6 @@
ACE_RCSID(ace, TP_Reactor, "$Id$")
-
ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor)
int
@@ -39,7 +37,6 @@ ACE_TP_Token_Guard::grab_token (ACE_Time_Value *max_wait_time)
ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook));
}
- // Now that this thread owns the token let us make
// Check for timeouts and errors.
if (result == -1)
{
@@ -55,7 +52,6 @@ ACE_TP_Token_Guard::grab_token (ACE_Time_Value *max_wait_time)
return result;
}
-
int
ACE_TP_Token_Guard::acquire_token (ACE_Time_Value *max_wait_time)
{
@@ -79,7 +75,6 @@ ACE_TP_Token_Guard::acquire_token (ACE_Time_Value *max_wait_time)
ACE_MT (result = this->token_.acquire ());
}
- // Now that this thread owns the token let us make
// Check for timeouts and errors.
if (result == -1)
{
@@ -148,14 +143,14 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
// called.
ACE_Countdown_Time countdown (max_wait_time);
+ //
// The order of these events is very subtle, modify with care.
-
+ //
// Instantiate the token guard which will try grabbing the token for
// this thread.
ACE_TP_Token_Guard guard (this->token_);
-
int result = guard.grab_token (max_wait_time);
// If the guard is NOT the owner just return the retval
@@ -169,145 +164,10 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
// Update the countdown to reflect time waiting for the token.
countdown.update ();
-
return this->dispatch_i (max_wait_time,
guard);
}
-
-int
-ACE_TP_Reactor::remove_handler (ACE_Event_Handler *eh,
- ACE_Reactor_Mask mask)
-{
- int result = 0;
- // Artificial scoping for grabbing and releasing the token
- {
- ACE_TP_Token_Guard guard (this->token_);
-
- // Acquire the token
- result = guard.acquire_token ();
-
- if (!guard.is_owner ())
- return result;
-
- // Call the remove_handler_i () with a DONT_CALL mask. We dont
- // want to call the handle_close with the token held.
- result = this->remove_handler_i (eh->get_handle (),
- mask | ACE_Event_Handler::DONT_CALL);
-
- if (result == -1)
- return -1;
- }
-
- // Close down the <Event_Handler> unless we've been instructed not
- // to.
- if (result == 0 && (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0))
- eh->handle_close (ACE_INVALID_HANDLE, mask);
-
- return 0;
-}
-
-int
-ACE_TP_Reactor::remove_handler (ACE_HANDLE handle,
- ACE_Reactor_Mask mask)
-{
-
- ACE_Event_Handler *eh = 0;
- int result = 0;
- // Artificial scoping for grabbing and releasing the token
- {
- ACE_TP_Token_Guard guard (this->token_);
-
- // Acquire the token
- result = guard.acquire_token ();
-
- if (!guard.is_owner ())
- return result;
-
- size_t slot = 0;
- eh = this->handler_rep_.find (handle, &slot);
-
- if (eh == 0)
- return -1;
-
- // Call the remove_handler_i () with a DONT_CALL mask. We dont
- // want to call the handle_close with the token held.
- result = this->remove_handler_i (handle,
- mask | ACE_Event_Handler::DONT_CALL);
-
- if (result == -1)
- return -1;
- }
-
- // Close down the <Event_Handler> unless we've been instructed not
- // to.
- // @@ Note: The check for result ==0 may be redundant, but shouldnt
- // be a problem.
- if (result ==0 && (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0))
- eh->handle_close (handle, mask);
-
- return 0;
-}
-
-
-int
-ACE_TP_Reactor::remove_handler (const ACE_Handle_Set &handles,
- ACE_Reactor_Mask m)
-{
- // Array of <Event_Handlers> corresponding to <handles>
- ACE_Event_Handler **aeh = 0;
-
- // Allocate memory for the size of the handle set
- ACE_NEW_RETURN (aeh,
- ACE_Event_Handler *[handles.num_set ()],
- -1);
-
- size_t index = 0;
-
- // Artificial scoping for grabbing and releasing the token
- {
- ACE_TP_Token_Guard guard (this->token_);
-
- // Acquire the token
- int result = guard.acquire_token ();
-
- if (!guard.is_owner ())
- return result;
-
- ACE_HANDLE h;
-
- ACE_Handle_Set_Iterator handle_iter (handles);
-
- while ((h = handle_iter ()) != ACE_INVALID_HANDLE)
- {
- size_t slot = 0;
- ACE_Event_Handler *eh =
- this->handler_rep_.find (h, &slot);
-
- if (this->remove_handler_i (h,
- m | ACE_Event_Handler::DONT_CALL) == -1)
- {
- delete [] aeh;
- return -1;
- }
-
- aeh [index] = eh;
- index ++;
- }
- }
-
- // Close down the <Event_Handler> unless we've been instructed not
- // to.
- if (ACE_BIT_ENABLED (m, ACE_Event_Handler::DONT_CALL) == 0)
- {
- for (size_t i = 0; i < index; i++)
- aeh[i]->handle_close (ACE_INVALID_HANDLE, m);
- }
-
- delete [] aeh;
- return 0;
-}
-
int
ACE_TP_Reactor::register_handler (int,
ACE_Event_Handler *,
@@ -378,22 +238,6 @@ ACE_TP_Reactor::register_handler (const ACE_Handle_Set &handles,
}
int
-ACE_TP_Reactor::remove_handler (int /*signum*/,
- ACE_Sig_Action * /*new_disp*/,
- ACE_Sig_Action * /*old_disp*/,
- int /*sigkey*/)
-{
- ACE_NOTSUP_RETURN (-1);
-}
-
-int
-ACE_TP_Reactor::remove_handler (const ACE_Sig_Set & /*sigset*/)
-{
- ACE_NOTSUP_RETURN (-1);
-}
-
-
-int
ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
ACE_TP_Token_Guard &guard)
{
@@ -413,6 +257,7 @@ ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
// a later point of time, we decide to handle signals we have to
// release the lock before we make any upcalls.. What is here
// now is not the right thing...
+ //
// @@ We need to do better..
return this->handle_signals (event_count,
guard);
@@ -421,8 +266,8 @@ ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
// If there are no signals and if we had received a proper
// event_count then first look at dispatching timeouts. We need to
// handle timers early since they may have higher latency
- // constraints than I/O handlers. Ideally, the order of
- // dispatching should be a strategy...
+ // constraints than I/O handlers. Ideally, the order of dispatching
+ // should be a strategy...
// NOTE: The event count does not have the number of timers that
// needs dispatching. But we are still passing this along. We dont
@@ -435,14 +280,13 @@ ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
if (result > 0)
return result;
-
- // Else justgo ahead fall through for further handling
+ // Else just go ahead fall through for further handling.
if (event_count > 0)
{
// Next dispatch the notification handlers (if there are any to
- // dispatch). These are required to handle multiple-threads that
- // are trying to update the <Reactor>.
+ // dispatch). These are required to handle multiple-threads
+ // that are trying to update the <Reactor>.
result = this->handle_notify_events (event_count,
guard);
@@ -460,12 +304,8 @@ ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
}
return 0;
-
}
-
-
-
int
ACE_TP_Reactor::handle_signals (int & /*event_count*/,
ACE_TP_Token_Guard & /*guard*/)
@@ -496,7 +336,7 @@ ACE_TP_Reactor::handle_signals (int & /*event_count*/,
// result of signals they should be dispatched since
// they may be time critical...
active_handle_count = this->any_ready (dispatch_set);
- #else
+#else
// active_handle_count = 0;
#endif
@@ -525,14 +365,25 @@ ACE_TP_Reactor::handle_timer_events (int & /*event_count*/,
if (this->timer_queue_->dispatch_info (cur_time,
info))
{
+ const void *upcall_act = 0;
+
+ // Preinvoke.
+ this->timer_queue_->preinvoke (info,
+ cur_time,
+ upcall_act);
+
// Release the token before dispatching notifies...
guard.release_token ();
// call the functor
- this->timer_queue_->upcall (info.type_,
- info.act_,
+ this->timer_queue_->upcall (info,
cur_time);
+ // Postinvoke
+ this->timer_queue_->postinvoke (info,
+ cur_time,
+ upcall_act);
+
// We have dispatched a timer
return 1;
}
@@ -540,8 +391,6 @@ ACE_TP_Reactor::handle_timer_events (int & /*event_count*/,
return 0;
}
-
-
int
ACE_TP_Reactor::handle_notify_events (int & /*event_count*/,
ACE_TP_Token_Guard &guard)
@@ -568,7 +417,7 @@ ACE_TP_Reactor::handle_notify_events (int & /*event_count*/,
while (this->notify_handler_->read_notify_pipe (notify_handle,
buffer) > 0)
{
- // Just figure out whether we can read any buffer that has
+ // Just figure out whether we can read any buffer that has
// dispatchable info. If not we have just been unblocked by
// another thread trying to update the reactor. If we get any
// buffer that needs dispatching we will dispatch that after
@@ -612,109 +461,54 @@ ACE_TP_Reactor::handle_socket_events (int &event_count,
return 0;
}
- // Suspend the handler so that other threads don't start
- // dispatching it.
+ // Suspend the handler so that other threads don't start dispatching
+ // it.
+ //
// NOTE: This check was performed in older versions of the
// TP_Reactor. Looks like it is a waste..
if (dispatch_info.event_handler_ != this->notify_handler_)
this->suspend_i (dispatch_info.handle_);
- // Release the lock. Others threads can start waiting.
- guard.release_token ();
+ int resume_flag =
+ dispatch_info.event_handler_->resume_handler ();
- int result = 0;
+ int reference_counting_required =
+ dispatch_info.event_handler_->reference_counting_policy ().value () ==
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
- // If there was an event handler ready, dispatch it.
- // Decrement the event left
- --event_count;
-
- if (this->dispatch_socket_event (dispatch_info) == 0)
- ++result; // Dispatched an event
-
- // This is to get around a problem/ which is well described in
- // 1361. This is just a work around that would help applications
- // from resuming handles at the most inopportune moment.
- int flag =
- ACE_Event_Handler::ACE_EVENT_HANDLER_NOT_RESUMED;
-
- // Acquire the token since we want to access the handler
- // repository. The call to find () does not hold a lock and hence
- // this is required.
- guard.acquire_token ();
-
- // Get the handler for the handle that we have dispatched to.
- ACE_Event_Handler *eh =
- this->handler_rep_.find (dispatch_info.handle_);
-
- // This check is required for the following reasons
- // 1. If dispatch operation returned a -1, then there is a
- // possibility that the event handler could be deleted. In such
- // cases the pointer to the event_handler that <dispatch_info>
- // holds is set to 0.
- //
- // 2. If the application did its own memory management, a return
- // value of 0 cannot be believed since the event handler could
- // be deleted by the application based on some conditions. This
- // is *bad*. But we dont have much of a choice with the existing
- // reactor setup. To get around this, we can make a check for
- // the handler registered with the repository for the handle
- // that we have and compare with the handler that we
- // posses. Yeah, I know it is like touching your nose by taking
- // your hand around your head. But that is the way it is. This
- // is a fix for [BUGID 1231]
-
- if (dispatch_info.event_handler_ != 0 &&
- eh == dispatch_info.event_handler_)
+ // Call add_reference() if needed.
+ if (reference_counting_required)
{
- flag =
- dispatch_info.event_handler_->resume_handler ();
+ dispatch_info.event_handler_->add_reference ();
}
- // Use resume_i () since we hold the token already.
- if (dispatch_info.handle_ != ACE_INVALID_HANDLE &&
- dispatch_info.event_handler_ != this->notify_handler_ &&
- flag == ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
- this->resume_i (dispatch_info.handle_);
-
- // Let me release the token here. This is not required since the
- // destruction of the object on the stack will take care of this.
+ // Release the lock. Others threads can start waiting.
guard.release_token ();
- return result;
-}
-
-int
-ACE_TP_Reactor::mask_ops (ACE_HANDLE handle,
- ACE_Reactor_Mask mask,
- int ops)
-{
- ACE_TRACE ("ACE_TP_Reactor::mask_ops");
- ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token,
- ace_mon, this->token_, -1));
-
int result = 0;
- // If it looks like the handle isn't suspended, then
- // set the ops on the wait_set_, otherwise set the suspend_set_.
+ // If there was an event handler ready, dispatch it.
+ // Decrement the event left
+ --event_count;
- if (this->suspend_set_.rd_mask_.is_set (handle) == 0
- && this->suspend_set_.wr_mask_.is_set (handle) == 0
- && this->suspend_set_.ex_mask_.is_set (handle) == 0)
+ // Dispatched an event
+ if (this->dispatch_socket_event (dispatch_info) == 0)
+ ++result;
- result = this->bit_ops (handle, mask,
- this->wait_set_,
- ops);
- else
+ // Resume handler if required.
+ if (dispatch_info.event_handler_ != this->notify_handler_ &&
+ resume_flag == ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
+ this->resume_handler (dispatch_info.handle_);
- result = this->bit_ops (handle, mask,
- this->suspend_set_,
- ops);
+ // Call remove_reference() if needed.
+ if (reference_counting_required)
+ {
+ dispatch_info.event_handler_->remove_reference ();
+ }
return result;
}
-
-
int
ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
{
@@ -756,7 +550,8 @@ ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
int
ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event)
{
- event.reset (); // Nothing to dispatch yet
+ // Nothing to dispatch yet
+ event.reset ();
// Check for dispatch in write, except, read. Only catch one, but if
// one is caught, be sure to clear the handle from each mask in case
@@ -863,10 +658,6 @@ ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
int retval =
this->remove_handler (handle, mask);
- // As the handler is no longer valid, invalidate the handle
- dispatch_info.event_handler_ = 0;
- dispatch_info.handle_ = ACE_INVALID_HANDLE;
-
return retval;
}
@@ -886,14 +677,6 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time)
return this->handle_events (&max_wait_time);
}
-int
-ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh,
- ACE_Reactor_Mask mask,
- int ops)
-{
- return this->mask_ops (eh->get_handle (), mask, ops);
-}
-
void
ACE_TP_Reactor::notify_handle (ACE_HANDLE,
ACE_Reactor_Mask,