summaryrefslogtreecommitdiff
path: root/ace/TP_Reactor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/TP_Reactor.cpp')
-rw-r--r--ace/TP_Reactor.cpp495
1 files changed, 259 insertions, 236 deletions
diff --git a/ace/TP_Reactor.cpp b/ace/TP_Reactor.cpp
index 8b8fbc97c40..2fffb8b6b9a 100644
--- a/ace/TP_Reactor.cpp
+++ b/ace/TP_Reactor.cpp
@@ -4,7 +4,6 @@
#include "ace/TP_Reactor.h"
#include "ace/Reactor.h"
#include "ace/Thread.h"
-#include "ace/Log_Msg.h"
#if !defined (__ACE_INLINE__)
#include "ace/TP_Reactor.i"
@@ -56,9 +55,6 @@ ACE_TP_Token_Guard::grab_token (ACE_Time_Value *max_wait_time)
return result;
}
-/************************************************************************/
-// Methods for ACE_TP_Reactor
-/************************************************************************/
ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh,
ACE_Timer_Queue *tq,
@@ -80,23 +76,25 @@ ACE_TP_Reactor::ACE_TP_Reactor (size_t size,
this->supress_notify_renew (1);
}
-
-
-void
-ACE_TP_Reactor::max_notify_iterations (int /*iterations*/)
+int
+ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id)
{
- ACE_TRACE ("ACE_TP_Reactor::max_notify_iterations");
+ ACE_TRACE ("ACE_TP_Reactor::owner");
+ if (o_id)
+ *o_id = ACE_Thread::self ();
+
+ return 0;
- ACE_ERROR ((LM_ERROR,
- "(%P|%t) This has no effect on the notify processing \n"));
}
int
-ACE_TP_Reactor::max_notify_iterations (void)
+ACE_TP_Reactor::owner (ACE_thread_t *t_id)
{
- ACE_TRACE ("ACE_TP_Reactor::max_notify_iterations");
+ ACE_TRACE ("ACE_TP_Reactor::owner");
+ *t_id = ACE_Thread::self ();
return 0;
+
}
@@ -105,42 +103,50 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
{
ACE_TRACE ("ACE_TP_Reactor::handle_events");
- int result = 0;
-
// Stash the current time -- the destructor of this object will
// automatically compute how much time elpased since this method was
// called.
ACE_Countdown_Time countdown (max_wait_time);
+ // The order of these events is very subtle, modify with care.
+
+
// Instantiate the token guard which will try grabbing the token for
// this thread.
- ACE_TP_Token_Guard guard (this->token_,
- max_wait_time,
- result);
+ ACE_TP_Token_Guard guard (this->token_);
+
+
+ int result = guard.grab_token (max_wait_time);
// If the guard is NOT the owner just return the retval
if (!guard.is_owner ())
return result;
+ // After getting the lock just just for deactivation..
+ if (this->deactivated_)
+ return -1;
+
// Update the countdown to reflect time waiting for the token.
countdown.update ();
- // After acquiring the lock, check if we have been deactivated. If
- // we are deactivated, simply return without handling further
- // events.
- if (this->deactivated_)
- {
- return -1;
- }
+ return this->dispatch_i (max_wait_time,
+ guard);
+}
- // We got the lock, lets handle some events. We collect the events
- // that we need to handle. We release the token and then handle
- // those events that needs handling.
+
+int
+ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
+ ACE_TP_Token_Guard &guard)
+{
int event_count =
this->get_event_for_dispatching (max_wait_time);
+ int result = 0;
+ // Note: We are passing the <event_count> around, to have record of
+ // how many events still need processing. May be this could be
+ // useful in future.
// Dispatch signals
if (event_count == -1)
@@ -161,14 +167,14 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
// handle timers early since they may have higher latency
// constraints than I/O handlers. Ideally, the order of
// dispatching should be a strategy...
- int retval = this->handle_timer_events (event_count,
- guard);
+ result = this->handle_timer_events (event_count,
+ guard);
- if (retval > 0)
- return retval;
+ if (result > 0)
+ return result;
- // Else just fall through for further handling
- }
+ // Else just fall through for further handling
+ }
if (event_count > 0)
@@ -176,128 +182,27 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
// Next dispatch the notification handlers (if there are any to
// dispatch). These are required to handle multiple-threads that
// are trying to update the <Reactor>.
- int retval = this->handle_notify_events (event_count,
- guard);
+ result = this->handle_notify_events (event_count,
+ guard);
- if (retval > 0)
- return retval;
+ if (result > 0)
+ return result;
- // Else just fall through for further handling
+ // Else just fall through for further handling
}
-
- if (event_count > 0)
- {
- // Handle socket events
- return this->handle_socket_events (event_count,
- guard);
- }
-
- return 0;
-}
-
-int
-ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time)
-{
- ACE_TRACE ("ACE_TP_Reactor::handle_events");
- return this->handle_events (&max_wait_time);
-}
-
-int
-ACE_TP_Reactor::resumable_handler (void)
-{
- return 1;
-}
-
-int
-ACE_TP_Reactor::mask_ops (ACE_HANDLE handle,
- ACE_Reactor_Mask mask,
- int ops)
-{
- ACE_TRACE ("ACE_TP_Reactor::mask_ops");
- ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token,
- ace_mon, this->token_, -1));
-
- int result = 0;
-
- // If it looks like the handle isn't suspended, then
- // set the ops on the wait_set_, otherwise set the suspend_set_.
-
- if (this->suspend_set_.rd_mask_.is_set (handle) == 0
- && this->suspend_set_.wr_mask_.is_set (handle) == 0
- && this->suspend_set_.ex_mask_.is_set (handle) == 0)
-
- result = this->bit_ops (handle, mask,
- this->wait_set_,
- ops);
- else
-
- result = this->bit_ops (handle, mask,
- this->suspend_set_,
- ops);
-
- return result;
-}
-
-
-int
-ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh,
- ACE_Reactor_Mask mask,
- int ops)
-{
- ACE_TRACE ("ACE_TP_Reactor::mask_ops");
- return this->mask_ops (eh->get_handle (), mask, ops);
-}
-
-
-int
-ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id)
-{
- ACE_TRACE ("ACE_TP_Reactor::owner");
- if (o_id)
- *o_id = ACE_Thread::self ();
+ if (event_count > 0)
+ {
+ // Handle socket events
+ return this->handle_socket_events (event_count,
+ guard);
+ }
return 0;
}
-int
-ACE_TP_Reactor::owner (ACE_thread_t *t_id)
-{
- ACE_TRACE ("ACE_TP_Reactor::owner");
- *t_id = ACE_Thread::self ();
- return 0;
-}
-
-
-int
-ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
-{
- ACE_TRACE ("ACE_TP_Reactor::get_event_for_dispatching");
- // If the reactor handler state has changed, clear any remembered
- // ready bits and re-scan from the master wait_set.
- if (this->state_changed_)
- {
- this->ready_set_.rd_mask_.reset ();
- this->ready_set_.wr_mask_.reset ();
- this->ready_set_.ex_mask_.reset ();
- this->state_changed_ = 0;
- }
- else
- {
- // This is a hack... somewhere, under certain conditions (which
- // I don't understand...) the mask will have all of its bits clear,
- // yet have a size_ > 0. This is an attempt to remedy the affect,
- // without knowing why it happens.
- this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
- this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
- this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
- }
-
- return this->wait_for_multiple_events (this->ready_set_,
- max_wait_time);
-}
int
@@ -378,6 +283,7 @@ ACE_TP_Reactor::handle_timer_events (int &event_count,
}
+
int
ACE_TP_Reactor::handle_notify_events (int &event_count,
ACE_TP_Token_Guard &guard)
@@ -388,61 +294,83 @@ ACE_TP_Reactor::handle_notify_events (int &event_count,
if (notify_handle != ACE_INVALID_HANDLE)
{
+
// Clear the handle of the read_mask of our <ready_set_>
this->ready_set_.rd_mask_.clr_bit (notify_handle);
- // Decrement the number of events that needs handling yet.
- event_count--;
+ // Now just do a read on the pipe..
+ ACE_Notification_Buffer buffer;
- // Release the token before dispatching notifies...
- guard.release_token ();
+ if (this->notify_handler_->read_notify_pipe (notify_handle,
+ buffer) >= 0)
+ {
+ event_count--;
+
+ // Release the token before dispatching notifies...
+ guard.release_token ();
+
+ this->notify_handler_->dispatch_notify (buffer);
+
+ this->renew ();
+ return 1;
+ }
- // Dipatch and return
- return
- this->notify_handler_->dispatch_notify (notify_handle);
+ return 0;
}
return 0;
}
+
int
ACE_TP_Reactor::handle_socket_events (int &event_count,
ACE_TP_Token_Guard &guard)
{
- ACE_TRACE ("ACE_TP_Reactor::handle_socket_events");
+ // We got the lock, lets handle some events. Note: this method will
+ // *not* dispatch any I/O handlers. It will dispatch signals,
+ // timeouts, and notifications.
ACE_EH_Dispatch_Info dispatch_info;
- // Get the socket event dispatch information
+ this->get_socket_event_info (dispatch_info);
+
// If there is any event handler that is ready to be dispatched, the
// dispatch information is recorded in dispatch_info.
- int result =
- this->get_socket_event_info (dispatch_info);
-
-
- if (result)
+ if (dispatch_info.dispatch ())
{
// Suspend the handler so that other threads don't start
// dispatching it.
- this->suspend_i (dispatch_info.handle_);
+ // Make sure we never suspend the notify_handler_ without holding
+ // the lock.
+ // @@ Actually, we don't even need to suspend the notify_handler_
+ // here. But let me get it to work first.
+ if (dispatch_info.event_handler_ != this->notify_handler_)
+ this->suspend_i (dispatch_info.handle_);
+ }
- // Decrement the number of events that needs handling yet.
- event_count--;
+ /// Decrement the event left
+ --event_count;
- // Release the token before dispatching notifies...
- guard.release_token ();
+ // Release the lock. Others threads can start waiting.
+ guard.release_token ();
- result = this->dispatch_socket_events (dispatch_info);
+ int result = 0;
+ // If there was an event handler ready, dispatch it.
+ if (dispatch_info.dispatch ())
+ {
+ if (this->dispatch_socket_event (dispatch_info) == 0)
+ ++result; // Dispatched one more event
int flag = 0;
if (dispatch_info.event_handler_ != 0)
{
- flag =
- dispatch_info.event_handler_->resume_handler ();
+ flag =
+ dispatch_info.event_handler_->resume_handler ();
}
if (dispatch_info.handle_ != ACE_INVALID_HANDLE &&
+ dispatch_info.event_handler_ != this->notify_handler_ &&
flag == 0)
this->resume_handler (dispatch_info.handle_);
}
@@ -450,67 +378,93 @@ ACE_TP_Reactor::handle_socket_events (int &event_count,
return result;
}
+int
+ACE_TP_Reactor::mask_ops (ACE_HANDLE handle,
+ ACE_Reactor_Mask mask,
+ int ops)
+{
+ ACE_TRACE ("ACE_TP_Reactor::mask_ops");
+ ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token,
+ ace_mon, this->token_, -1));
-ACE_HANDLE
-ACE_TP_Reactor::get_notify_handle (void)
+ int result = 0;
+
+ // If it looks like the handle isn't suspended, then
+ // set the ops on the wait_set_, otherwise set the suspend_set_.
+
+ if (this->suspend_set_.rd_mask_.is_set (handle) == 0
+ && this->suspend_set_.wr_mask_.is_set (handle) == 0
+ && this->suspend_set_.ex_mask_.is_set (handle) == 0)
+
+ result = this->bit_ops (handle, mask,
+ this->wait_set_,
+ ops);
+ else
+
+ result = this->bit_ops (handle, mask,
+ this->suspend_set_,
+ ops);
+
+ return result;
+}
+
+
+
+int
+ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
{
- // Call the notify handler to get a handle on which we would have a
- // notify waiting
- ACE_HANDLE read_handle =
- this->notify_handler_->notify_handle ();
- // Check whether the rd_mask has been set on that handle. If so
- // return the handle.
- if (read_handle != ACE_INVALID_HANDLE &&
- this->ready_set_.rd_mask_.is_set (read_handle))
+ // If the reactor handler state has changed, clear any remembered
+ // ready bits and re-scan from the master wait_set.
+ if (this->state_changed_)
{
- return read_handle;
+ this->ready_set_.rd_mask_.reset ();
+ this->ready_set_.wr_mask_.reset ();
+ this->ready_set_.ex_mask_.reset ();
+ this->state_changed_ = 0;
}
+ else
+ {
+ // This is a hack... somewhere, under certain conditions (which
+ // I don't understand...) the mask will have all of its bits clear,
+ // yet have a size_ > 0. This is an attempt to remedy the affect,
+ // without knowing why it happens.
- return ACE_INVALID_HANDLE;
-}
-
+ //# if !(defined (__SUNPRO_CC) && (__SUNPRO_CC > 0x500))
+ // SunCC seems to be having problems with this piece of code
+ // here. I am not sure why though. This works fine with other
+ // compilers. As we dont seem to understand when this piece of
+ // code is needed and as it creates problems for SunCC we will
+ // not compile this. Most of the tests in TAO seem to be happy
+ // without this in SunCC.
+ this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
+ this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
+ this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
+ //# endif /* ! __SUNPRO_CC */
+ }
+ return this->wait_for_multiple_events (this->ready_set_,
+ max_wait_time);
+}
int
ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event)
{
+ event.reset (); // Nothing to dispatch yet
// Check for dispatch in write, except, read. Only catch one, but if
// one is caught, be sure to clear the handle from each mask in case
// there is more than one mask set for it. This would cause problems
// if the handler is suspended for dispatching, but its set bit in
// another part of ready_set_ kept it from being dispatched.
+ int found_io = 0;
ACE_HANDLE handle;
- // Look at the read masks
- {
- ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
-
- while ((handle = handle_iter ()) != ACE_INVALID_HANDLE)
- {
- if (this->is_suspended_i (handle))
- continue;
-
- // Remember this info
- event.set (handle,
- this->handler_rep_.find (handle),
- ACE_Event_Handler::READ_MASK,
- &ACE_Event_Handler::handle_input);
- this->ready_set_.rd_mask_.clr_bit (handle);
- this->ready_set_.wr_mask_.clr_bit (handle);
- this->ready_set_.ex_mask_.clr_bit (handle);
- return 1;
- }
- }
-
- // We havent found any rd_masks for processing yet, so look for
- // write masks
{
ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_);
- while ((handle = handle_iter ()) != ACE_INVALID_HANDLE)
+ while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
{
if (this->is_suspended_i (handle))
continue;
@@ -523,45 +477,67 @@ ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event)
this->ready_set_.wr_mask_.clr_bit (handle);
this->ready_set_.ex_mask_.clr_bit (handle);
this->ready_set_.rd_mask_.clr_bit (handle);
- return 1;
+ found_io = 1;
}
}
- // We havent found any rd_mask and wr_masks for processing yet, so
- // look for ex_masks
- {
- ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
+ if (!found_io)
+ {
+ ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
- while ((handle = handle_iter ()) != ACE_INVALID_HANDLE)
- {
- if (this->is_suspended_i (handle))
- continue;
+ while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
+ {
+ if (this->is_suspended_i (handle))
+ continue;
+
+ // Remember this info
+ event.set (handle,
+ this->handler_rep_.find (handle),
+ ACE_Event_Handler::EXCEPT_MASK,
+ &ACE_Event_Handler::handle_exception);
+ this->ready_set_.ex_mask_.clr_bit (handle);
+ this->ready_set_.wr_mask_.clr_bit (handle);
+ this->ready_set_.rd_mask_.clr_bit (handle);
+ found_io = 1;
+ }
+ }
- // Remember this info
- event.set (handle,
- this->handler_rep_.find (handle),
- ACE_Event_Handler::EXCEPT_MASK,
- &ACE_Event_Handler::handle_exception);
- this->ready_set_.ex_mask_.clr_bit (handle);
- this->ready_set_.wr_mask_.clr_bit (handle);
- this->ready_set_.rd_mask_.clr_bit (handle);
- return 1;
- }
+ if (!found_io)
+ {
+ ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
+
+ while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
+ {
+ if (this->is_suspended_i (handle))
+ continue;
+
+ // Remember this info
+ event.set (handle,
+ this->handler_rep_.find (handle),
+ ACE_Event_Handler::READ_MASK,
+ &ACE_Event_Handler::handle_input);
+ this->ready_set_.rd_mask_.clr_bit (handle);
+ this->ready_set_.wr_mask_.clr_bit (handle);
+ this->ready_set_.ex_mask_.clr_bit (handle);
+ found_io = 1;
+ }
}
- // We didnt find any..
- return 0;
+ return found_io;
}
+
+
+// Dispatches a single event handler
int
-ACE_TP_Reactor::dispatch_socket_events (ACE_EH_Dispatch_Info &info)
+ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
{
- ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_events");
+ ACE_TRACE ("ACE_TP_Reactor::notify_handle");
- ACE_HANDLE handle = info.handle_;
- ACE_Event_Handler *event_handler = info.event_handler_;
- ACE_Reactor_Mask mask = info.mask_;
- ACE_EH_PTMF callback = info.callback_;
+ ACE_HANDLE handle = dispatch_info.handle_;
+ ACE_Event_Handler *event_handler = dispatch_info.event_handler_;
+ ACE_Reactor_Mask mask = dispatch_info.mask_;
+ ACE_EH_PTMF callback = dispatch_info.callback_;
// Check for removed handlers.
if (event_handler == 0)
@@ -583,17 +559,37 @@ ACE_TP_Reactor::dispatch_socket_events (ACE_EH_Dispatch_Info &info)
this->remove_handler (handle, mask);
// As the handler is no longer valid, invalidate the handle
- info.event_handler_ = 0;
- info.handle_ = ACE_INVALID_HANDLE;
+ dispatch_info.event_handler_ = 0;
+ dispatch_info.handle_ = ACE_INVALID_HANDLE;
return retval;
}
+ // assert (status >= 0);
+ return 0;
+}
+
+int
+ACE_TP_Reactor::resumable_handler (void)
+{
return 1;
}
+ACE_INLINE int
+ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time)
+{
+ return ACE_Select_Reactor::handle_events (max_wait_time);
+}
+
+ACE_INLINE int
+ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh,
+ ACE_Reactor_Mask mask,
+ int ops)
+{
+ return this->mask_ops (eh->get_handle (), mask, ops);
+}
-void
+ACE_INLINE void
ACE_TP_Reactor::notify_handle (ACE_HANDLE,
ACE_Reactor_Mask,
ACE_Handle_Set &,
@@ -603,3 +599,30 @@ ACE_TP_Reactor::notify_handle (ACE_HANDLE,
ACE_ERROR ((LM_ERROR,
ACE_LIB_TEXT ("ACE_TP_Reactor::notify_handle: Wrong version of notify_handle() gets called")));
}
+
+ACE_HANDLE
+ACE_TP_Reactor::get_notify_handle (void)
+{
+ // Call the notify handler to get a handle on which we would have a
+ // notify waiting
+ ACE_HANDLE read_handle =
+ this->notify_handler_->notify_handle ();
+
+ // Check whether the rd_mask has been set on that handle. If so
+ // return the handle.
+ // if (read_handle != ACE_INVALID_HANDLE &&
+ //this->ready_set_.rd_mask_.is_set (read_handle))
+ if (read_handle != ACE_INVALID_HANDLE)
+ {
+ ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
+ ACE_HANDLE handle = ACE_INVALID_HANDLE;
+
+ while ((handle = handle_iter ()) == read_handle)
+ {
+ return read_handle;
+ }
+ }
+
+ // None found..
+ return ACE_INVALID_HANDLE;
+}