summaryrefslogtreecommitdiff
path: root/ace/Other/TP_Reactor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/Other/TP_Reactor.cpp')
-rw-r--r--ace/Other/TP_Reactor.cpp829
1 files changed, 829 insertions, 0 deletions
diff --git a/ace/Other/TP_Reactor.cpp b/ace/Other/TP_Reactor.cpp
new file mode 100644
index 00000000000..e6b0d189f89
--- /dev/null
+++ b/ace/Other/TP_Reactor.cpp
@@ -0,0 +1,829 @@
+// $Id$
+
+
+#include "ace/TP_Reactor.h"
+#include "ace/Reactor.h"
+#include "ace/Thread.h"
+
+#if !defined (__ACE_INLINE__)
+#include "ace/TP_Reactor.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(ace, TP_Reactor, "$Id$")
+
+
+ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor)
+
+int
+ACE_TP_Token_Guard::grab_token (ACE_Time_Value *max_wait_time)
+{
+ ACE_TRACE ("ACE_TP_Token_Guard::grab_token");
+
+ // The order of these events is very subtle, modify with care.
+
+ // Try to grab the lock. If someone if already there, don't wake
+ // them up, just queue up in the thread pool.
+ int result = 0;
+
+ if (max_wait_time)
+ {
+ ACE_Time_Value tv = ACE_OS::gettimeofday ();
+ tv += *max_wait_time;
+
+ ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook,
+ 0,
+ &tv));
+ }
+ else
+ {
+ ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook));
+ }
+
+ // Now that this thread owns the token let us make
+ // Check for timeouts and errors.
+ if (result == -1)
+ {
+ if (errno == ETIME)
+ return 0;
+ else
+ return -1;
+ }
+
+ // We got the token and so let us mark ourseleves as owner
+ this->owner_ = 1;
+
+ return result;
+}
+
+
+int
+ACE_TP_Token_Guard::acquire_token (ACE_Time_Value *max_wait_time)
+{
+ ACE_TRACE ("ACE_TP_Token_Guard::acquire_token");
+
+ // Try to grab the lock. If someone if already there, don't wake
+ // them up, just queue up in the thread pool.
+ int result = 0;
+
+ if (max_wait_time)
+ {
+ ACE_Time_Value tv = ACE_OS::gettimeofday ();
+ tv += *max_wait_time;
+
+ ACE_MT (result = this->token_.acquire (0,
+ 0,
+ &tv));
+ }
+ else
+ {
+ ACE_MT (result = this->token_.acquire ());
+ }
+
+ // Now that this thread owns the token let us make
+ // Check for timeouts and errors.
+ if (result == -1)
+ {
+ if (errno == ETIME)
+ return 0;
+ else
+ return -1;
+ }
+
+ // We got the token and so let us mark ourseleves as owner
+ this->owner_ = 1;
+
+ return result;
+}
+
+
+
+ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh,
+ ACE_Timer_Queue *tq,
+ int mask_signals)
+ : ACE_Select_Reactor (sh, tq, 0, 0, mask_signals)
+{
+ ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
+ this->supress_notify_renew (1);
+}
+
+ACE_TP_Reactor::ACE_TP_Reactor (size_t size,
+ int rs,
+ ACE_Sig_Handler *sh,
+ ACE_Timer_Queue *tq,
+ int mask_signals)
+ : ACE_Select_Reactor (size, rs, sh, tq, 0, 0, mask_signals)
+{
+ ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
+ this->supress_notify_renew (1);
+}
+
+int
+ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id)
+{
+ ACE_TRACE ("ACE_TP_Reactor::owner");
+ if (o_id)
+ *o_id = ACE_Thread::self ();
+
+ return 0;
+}
+
+int
+ACE_TP_Reactor::owner (ACE_thread_t *t_id)
+{
+ ACE_TRACE ("ACE_TP_Reactor::owner");
+ *t_id = ACE_Thread::self ();
+
+ return 0;
+}
+
+int
+ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
+{
+ ACE_TRACE ("ACE_TP_Reactor::handle_events");
+
+ // Stash the current time -- the destructor of this object will
+ // automatically compute how much time elpased since this method was
+ // called.
+ ACE_Countdown_Time countdown (max_wait_time);
+
+ // The order of these events is very subtle, modify with care.
+
+
+ // Instantiate the token guard which will try grabbing the token for
+ // this thread.
+ ACE_TP_Token_Guard guard (this->token_);
+
+
+ int result = guard.grab_token (max_wait_time);
+
+ // If the guard is NOT the owner just return the retval
+ if (!guard.is_owner ())
+ return result;
+
+ // After getting the lock just just for deactivation..
+ if (this->deactivated_)
+ return -1;
+
+ // Update the countdown to reflect time waiting for the token.
+ countdown.update ();
+
+
+ return this->dispatch_i (max_wait_time,
+ guard);
+}
+
+
+int
+ACE_TP_Reactor::remove_handler (ACE_Event_Handler *eh,
+ ACE_Reactor_Mask mask)
+{
+ // Artificial scoping for grabbing and releasing the token
+ {
+ ACE_TP_Token_Guard guard (this->token_);
+
+ // Acquire the token
+ int result = guard.acquire_token ();
+
+ if (!guard.is_owner ())
+ return result;
+
+ // Call the remove_handler_i () with a DONT_CALL mask. We dont
+ // want to call the handle_close with the token held.
+ if (this->remove_handler_i (eh->get_handle (),
+ mask | ACE_Event_Handler::DONT_CALL) == -1)
+ return -1;
+ }
+
+ // Close down the <Event_Handler> unless we've been instructed not
+ // to.
+ if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
+ eh->handle_close (ACE_INVALID_HANDLE, mask);
+
+ return 0;
+}
+
+int
+ACE_TP_Reactor::remove_handler (ACE_HANDLE handle,
+ ACE_Reactor_Mask mask)
+{
+
+ ACE_Event_Handler *eh = 0;
+
+ // Artificial scoping for grabbing and releasing the token
+ {
+ ACE_TP_Token_Guard guard (this->token_);
+
+ // Acquire the token
+ int result = guard.acquire_token ();
+
+ if (!guard.is_owner ())
+ return result;
+
+ size_t slot = 0;
+ eh = this->handler_rep_.find (handle, &slot);
+
+ if (eh == 0)
+ return -1;
+
+ // Call the remove_handler_i () with a DONT_CALL mask. We dont
+ // want to call the handle_close with the token held.
+ if (this->remove_handler_i (handle,
+ mask | ACE_Event_Handler::DONT_CALL) == -1)
+ return -1;
+ }
+
+ // Close down the <Event_Handler> unless we've been instructed not
+ // to.
+ if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
+ eh->handle_close (handle, mask);
+
+ return 0;
+}
+
+
+int
+ACE_TP_Reactor::remove_handler (const ACE_Handle_Set &handles,
+ ACE_Reactor_Mask m)
+{
+ // Array of <Event_Handlers> corresponding to <handles>
+ ACE_Event_Handler **aeh = 0;
+
+ // Allocate memory for the size of the handle set
+ ACE_NEW_RETURN (aeh,
+ ACE_Event_Handler *[handles.num_set ()],
+ -1);
+
+ size_t index = 0;
+
+ // Artificial scoping for grabbing and releasing the token
+ {
+ ACE_TP_Token_Guard guard (this->token_);
+
+ // Acquire the token
+ int result = guard.acquire_token ();
+
+ if (!guard.is_owner ())
+ return result;
+
+ ACE_HANDLE h;
+
+ ACE_Handle_Set_Iterator handle_iter (handles);
+
+ while ((h = handle_iter ()) != ACE_INVALID_HANDLE)
+ {
+ size_t slot = 0;
+ ACE_Event_Handler *eh =
+ this->handler_rep_.find (h, &slot);
+
+ if (this->remove_handler_i (h,
+ m | ACE_Event_Handler::DONT_CALL) == -1)
+ {
+ delete [] aeh;
+ return -1;
+ }
+
+ aeh [index] = eh;
+ index ++;
+ }
+ }
+
+ // Close down the <Event_Handler> unless we've been instructed not
+ // to.
+ if (ACE_BIT_ENABLED (m, ACE_Event_Handler::DONT_CALL) == 0)
+ {
+ for (size_t i = 0; i < index; i++)
+ aeh[i]->handle_close (ACE_INVALID_HANDLE, m);
+ }
+
+ delete [] aeh;
+ return 0;
+}
+
+int
+ACE_TP_Reactor::remove_handler (int /*signum*/,
+ ACE_Sig_Action * /*new_disp*/,
+ ACE_Sig_Action * /*old_disp*/,
+ int /*sigkey*/)
+{
+ ACE_NOTSUP_RETURN (-1);
+}
+
+int
+ACE_TP_Reactor::remove_handler (const ACE_Sig_Set & /*sigset*/)
+{
+ ACE_NOTSUP_RETURN (-1);
+}
+
+
+int
+ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
+ ACE_TP_Token_Guard &guard)
+{
+ int event_count =
+ this->get_event_for_dispatching (max_wait_time);
+
+ int result = 0;
+
+ // Note: We are passing the <event_count> around, to have record of
+ // how many events still need processing. May be this could be
+ // useful in future.
+
+ // Dispatch signals
+ if (event_count == -1)
+ {
+ // Looks like we dont do any upcalls in dispatch signals. If at
+ // a later point of time, we decide to handle signals we have to
+ // release the lock before we make any upcalls.. What is here
+ // now is not the right thing...
+ // @@ We need to do better..
+ return this->handle_signals (event_count,
+ guard);
+ }
+
+ // If there are no signals and if we had received a proper
+ // event_count then first look at dispatching timeouts. We need to
+ // handle timers early since they may have higher latency
+ // constraints than I/O handlers. Ideally, the order of
+ // dispatching should be a strategy...
+
+ // NOTE: The event count does not have the number of timers that
+ // needs dispatching. But we are still passing this along. We dont
+ // need to do that. In the future we *may* have the timers also
+ // returned through the <event_count>. Just passing that along for
+ // that day.
+ result = this->handle_timer_events (event_count,
+ guard);
+
+ if (result > 0)
+ return result;
+
+
+ // Else justgo ahead fall through for further handling
+
+ if (event_count > 0)
+ {
+ // Next dispatch the notification handlers (if there are any to
+ // dispatch). These are required to handle multiple-threads that
+ // are trying to update the <Reactor>.
+ result = this->handle_notify_events (event_count,
+ guard);
+
+ if (result > 0)
+ return result;
+
+ // Else just fall through for further handling
+ }
+
+ if (event_count > 0)
+ {
+ // Handle socket events
+ return this->handle_socket_events (event_count,
+ guard);
+ }
+
+ return 0;
+
+}
+
+
+
+
+int
+ACE_TP_Reactor::handle_signals (int & /*event_count*/,
+ ACE_TP_Token_Guard & /*guard*/)
+{
+ ACE_TRACE ("ACE_TP_Reactor::handle_signals");
+
+ /*
+ *
+ * THIS METHOD SEEMS BROKEN
+ *
+ *
+ */
+ // First check for interrupts.
+ // Bail out -- we got here since <select> was interrupted.
+ if (ACE_Sig_Handler::sig_pending () != 0)
+ {
+ ACE_Sig_Handler::sig_pending (0);
+
+ // This piece of code comes from the old TP_Reactor. We did not
+ // handle signals at all then. If we happen to handle signals
+ // in the TP_Reactor, we should then start worryiung about this
+ // - Bala 21-Aug- 01
+#if 0
+ // Not sure if this should be done in the TP_Reactor
+ // case... leave it out for now. -Steve Huston 22-Aug-00
+
+ // If any HANDLES in the <ready_set_> are activated as a
+ // result of signals they should be dispatched since
+ // they may be time critical...
+ active_handle_count = this->any_ready (dispatch_set);
+ #else
+ // active_handle_count = 0;
+#endif
+
+ // Record the fact that the Reactor has dispatched a
+ // handle_signal() method. We need this to return the
+ // appropriate count.
+ return 1;
+ }
+
+ return -1;
+}
+
+
+int
+ACE_TP_Reactor::handle_timer_events (int & /*event_count*/,
+ ACE_TP_Token_Guard &guard)
+{
+ // Get the current time
+ ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
+ this->timer_queue_->timer_skew ());
+
+ // Look for a node in the timer queue whose timer <= the present
+ // time.
+ ACE_Timer_Node_Dispatch_Info info;
+
+ if (this->timer_queue_->dispatch_info (cur_time,
+ info))
+ {
+ // Release the token before dispatching notifies...
+ guard.release_token ();
+
+ // call the functor
+ this->timer_queue_->upcall (info.type_,
+ info.act_,
+ cur_time);
+
+ // We have dispatched a timer
+ return 1;
+ }
+
+ return 0;
+}
+
+
+
+int
+ACE_TP_Reactor::handle_notify_events (int & /*event_count*/,
+ ACE_TP_Token_Guard &guard)
+{
+ // Get the handle on which notify calls could have occured
+ ACE_HANDLE notify_handle =
+ this->get_notify_handle ();
+
+ int result = 0;
+
+ // The notify was not in the list returned by
+ // wait_for_multiple_events ().
+ if (notify_handle == ACE_INVALID_HANDLE)
+ return result;
+
+ // Now just do a read on the pipe..
+ ACE_Notification_Buffer buffer;
+
+ // Clear the handle of the read_mask of our <ready_set_>
+ this->ready_set_.rd_mask_.clr_bit (notify_handle);
+
+ // Keep reading notifies till we empty it or till we have a
+ // dispatchable buffer
+ while (this->notify_handler_->read_notify_pipe (notify_handle,
+ buffer) > 0)
+ {
+ // Just figure out whether we can read any buffer that has
+ // dispatchable info. If not we have just been unblocked by
+ // another thread trying to update the reactor. If we get any
+ // buffer that needs dispatching we will dispatch that after
+ // releasing the lock
+ if (this->notify_handler_->is_dispatchable (buffer) > 0)
+ {
+ // Release the token before dispatching notifies...
+ guard.release_token ();
+
+ // Dispatch the upcall for the notify
+ this->notify_handler_->dispatch_notify (buffer);
+
+ // We had a successful dispatch.
+ result = 1;
+
+ // break out of the while loop
+ break;
+ }
+ }
+
+ // If we did ssome work, then we just return 1 which will allow us
+ // to get out of here. If we return 0, then we will be asked to do
+ // some work ie. dispacth socket events
+ return result;
+}
+
+int
+ACE_TP_Reactor::handle_socket_events (int &event_count,
+ ACE_TP_Token_Guard &guard)
+{
+
+ // We got the lock, lets handle some events. Note: this method will
+ // *not* dispatch any I/O handlers. It will dispatch signals,
+ // timeouts, and notifications.
+ ACE_EH_Dispatch_Info dispatch_info;
+
+ this->get_socket_event_info (dispatch_info);
+
+ // If there is any event handler that is ready to be dispatched, the
+ // dispatch information is recorded in dispatch_info.
+ if (dispatch_info.dispatch ())
+ {
+ // Suspend the handler so that other threads don't start
+ // dispatching it.
+ // NOTE: This check was performed in older versions of the
+ // TP_Reactor. Looks like it is a waste..
+ if (dispatch_info.event_handler_ != this->notify_handler_)
+ this->suspend_i (dispatch_info.handle_);
+ }
+
+ // Release the lock. Others threads can start waiting.
+ guard.release_token ();
+
+ int result = 0;
+
+ // If there was an event handler ready, dispatch it.
+ if (dispatch_info.dispatch ())
+ {
+ /// Decrement the event left
+ --event_count;
+
+ if (this->dispatch_socket_event (dispatch_info) == 0)
+ ++result; // Dispatched an event
+
+ int flag = 0;
+
+ // Hack of the decade ;-). We make an extra check for the handle
+ // in addition to the event handler before we make a check for
+ // the resume_handler ().
+ if (dispatch_info.event_handler_ != 0 &&
+ this->handler_rep_.find (dispatch_info.handle_) != 0)
+ {
+ flag =
+ dispatch_info.event_handler_->resume_handler ();
+ }
+
+ if (dispatch_info.handle_ != ACE_INVALID_HANDLE &&
+ dispatch_info.event_handler_ != this->notify_handler_ &&
+ flag == 0)
+ this->resume_handler (dispatch_info.handle_);
+ }
+
+ return result;
+}
+
+int
+ACE_TP_Reactor::mask_ops (ACE_HANDLE handle,
+ ACE_Reactor_Mask mask,
+ int ops)
+{
+ ACE_TRACE ("ACE_TP_Reactor::mask_ops");
+ ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token,
+ ace_mon, this->token_, -1));
+
+ int result = 0;
+
+ // If it looks like the handle isn't suspended, then
+ // set the ops on the wait_set_, otherwise set the suspend_set_.
+
+ if (this->suspend_set_.rd_mask_.is_set (handle) == 0
+ && this->suspend_set_.wr_mask_.is_set (handle) == 0
+ && this->suspend_set_.ex_mask_.is_set (handle) == 0)
+
+ result = this->bit_ops (handle, mask,
+ this->wait_set_,
+ ops);
+ else
+
+ result = this->bit_ops (handle, mask,
+ this->suspend_set_,
+ ops);
+
+ return result;
+}
+
+
+
+int
+ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
+{
+
+ // If the reactor handler state has changed, clear any remembered
+ // ready bits and re-scan from the master wait_set.
+ if (this->state_changed_)
+ {
+ this->ready_set_.rd_mask_.reset ();
+ this->ready_set_.wr_mask_.reset ();
+ this->ready_set_.ex_mask_.reset ();
+ this->state_changed_ = 0;
+ }
+ else
+ {
+ // This is a hack... somewhere, under certain conditions (which
+ // I don't understand...) the mask will have all of its bits clear,
+ // yet have a size_ > 0. This is an attempt to remedy the affect,
+ // without knowing why it happens.
+
+ //# if !(defined (__SUNPRO_CC) && (__SUNPRO_CC > 0x500))
+ // SunCC seems to be having problems with this piece of code
+ // here. I am not sure why though. This works fine with other
+ // compilers. As we dont seem to understand when this piece of
+ // code is needed and as it creates problems for SunCC we will
+ // not compile this. Most of the tests in TAO seem to be happy
+ // without this in SunCC.
+ this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
+ this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
+ this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
+ //# endif /* ! __SUNPRO_CC */
+
+ }
+
+ return this->wait_for_multiple_events (this->ready_set_,
+ max_wait_time);
+}
+
+int
+ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event)
+{
+ event.reset (); // Nothing to dispatch yet
+
+ // Check for dispatch in write, except, read. Only catch one, but if
+ // one is caught, be sure to clear the handle from each mask in case
+ // there is more than one mask set for it. This would cause problems
+ // if the handler is suspended for dispatching, but its set bit in
+ // another part of ready_set_ kept it from being dispatched.
+ int found_io = 0;
+ ACE_HANDLE handle;
+
+ // @@todo: We can do quite a bit of code reduction here. Let me get
+ // it to work before I do this.
+ {
+ ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_);
+
+ while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
+ {
+ if (this->is_suspended_i (handle))
+ continue;
+
+ // Remember this info
+ event.set (handle,
+ this->handler_rep_.find (handle),
+ ACE_Event_Handler::WRITE_MASK,
+ &ACE_Event_Handler::handle_output);
+ this->ready_set_.wr_mask_.clr_bit (handle);
+ this->ready_set_.ex_mask_.clr_bit (handle);
+ this->ready_set_.rd_mask_.clr_bit (handle);
+ found_io = 1;
+ }
+ }
+
+ if (!found_io)
+ {
+ ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
+
+ while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
+ {
+ if (this->is_suspended_i (handle))
+ continue;
+
+ // Remember this info
+ event.set (handle,
+ this->handler_rep_.find (handle),
+ ACE_Event_Handler::EXCEPT_MASK,
+ &ACE_Event_Handler::handle_exception);
+ this->ready_set_.ex_mask_.clr_bit (handle);
+ this->ready_set_.wr_mask_.clr_bit (handle);
+ this->ready_set_.rd_mask_.clr_bit (handle);
+ found_io = 1;
+ }
+ }
+
+ if (!found_io)
+ {
+ ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
+
+ while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
+ {
+ if (this->is_suspended_i (handle))
+ continue;
+
+ // Remember this info
+ event.set (handle,
+ this->handler_rep_.find (handle),
+ ACE_Event_Handler::READ_MASK,
+ &ACE_Event_Handler::handle_input);
+ this->ready_set_.rd_mask_.clr_bit (handle);
+ this->ready_set_.wr_mask_.clr_bit (handle);
+ this->ready_set_.ex_mask_.clr_bit (handle);
+ found_io = 1;
+ }
+ }
+
+ return found_io;
+}
+
+
+
+// Dispatches a single event handler
+int
+ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
+{
+ ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_event");
+
+ ACE_HANDLE handle = dispatch_info.handle_;
+ ACE_Event_Handler *event_handler = dispatch_info.event_handler_;
+ ACE_Reactor_Mask mask = dispatch_info.mask_;
+ ACE_EH_PTMF callback = dispatch_info.callback_;
+
+ // Check for removed handlers.
+ if (event_handler == 0)
+ return -1;
+
+ // Upcall. If the handler returns positive value (requesting a
+ // reactor callback) don't set the ready-bit because it will be
+ // ignored if the reactor state has changed. Just call back
+ // as many times as the handler requests it. Other threads are off
+ // handling other things.
+ int status = 1;
+ while (status > 0)
+ status = (event_handler->*callback) (handle);
+
+ // If negative, remove from Reactor
+ if (status < 0)
+ {
+ int retval =
+ this->remove_handler (handle, mask);
+
+ // As the handler is no longer valid, invalidate the handle
+ dispatch_info.event_handler_ = 0;
+ dispatch_info.handle_ = ACE_INVALID_HANDLE;
+
+ return retval;
+ }
+
+ // assert (status >= 0);
+ return 0;
+}
+
+int
+ACE_TP_Reactor::resumable_handler (void)
+{
+ return 1;
+}
+
+int
+ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time)
+{
+ return ACE_Select_Reactor::handle_events (&max_wait_time);
+}
+
+int
+ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh,
+ ACE_Reactor_Mask mask,
+ int ops)
+{
+ return this->mask_ops (eh->get_handle (), mask, ops);
+}
+
+void
+ACE_TP_Reactor::notify_handle (ACE_HANDLE,
+ ACE_Reactor_Mask,
+ ACE_Handle_Set &,
+ ACE_Event_Handler *,
+ ACE_EH_PTMF)
+{
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("ACE_TP_Reactor::notify_handle: Wrong version of notify_handle() gets called")));
+}
+
+ACE_HANDLE
+ACE_TP_Reactor::get_notify_handle (void)
+{
+ // Call the notify handler to get a handle on which we would have a
+ // notify waiting
+ ACE_HANDLE read_handle =
+ this->notify_handler_->notify_handle ();
+
+ // Check whether the rd_mask has been set on that handle. If so
+ // return the handle.
+ // if (read_handle != ACE_INVALID_HANDLE &&
+ //this->ready_set_.rd_mask_.is_set (read_handle))
+ if (read_handle != ACE_INVALID_HANDLE)
+ {
+ ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
+ ACE_HANDLE handle = ACE_INVALID_HANDLE;
+
+ while ((handle = handle_iter ()) == read_handle)
+ {
+ return read_handle;
+ }
+ ACE_UNUSED_ARG (handle);
+ }
+
+ // None found..
+ return ACE_INVALID_HANDLE;
+}