diff options
author | bala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-09-01 00:27:25 +0000 |
---|---|---|
committer | bala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-09-01 00:27:25 +0000 |
commit | 4d5d5f1fe58ea5ec308e7346f55a8df7334a8fd8 (patch) | |
tree | 99ae654dd4d7d3f5b5da3f28afd1a1293d2e9c9b | |
parent | 0db1d51d84a7714a81acbe9e6106732c9f9bf97b (diff) | |
download | ATCD-4d5d5f1fe58ea5ec308e7346f55a8df7334a8fd8.tar.gz |
ChangeLogTag: Fri Aug 31 19:14:52 2001 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r-- | ChangeLog | 41 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-02a | 41 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-03a | 41 | ||||
-rw-r--r-- | ace/Reactor_Impl.h | 7 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.cpp | 222 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.h | 10 | ||||
-rw-r--r-- | ace/TP_Reactor.cpp | 495 | ||||
-rw-r--r-- | ace/TP_Reactor.h | 31 | ||||
-rw-r--r-- | ace/TP_Reactor.i | 10 |
9 files changed, 537 insertions, 361 deletions
diff --git a/ChangeLog b/ChangeLog index 2b6981c5cd5..08a9ae8da47 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,44 @@ +Fri Aug 31 19:14:52 2001 Balachandran Natarajan <bala@cs.wustl.edu> + + * ace/Reactor_Impl.h: Added a new method by name read_notify_pipe + (). Also changed the dispatch_notify () to take in a + Notification buffer instead of a ACE_HANDLE. + + * ace/Select_Reactor_Base.cpp: + * ace/Select_Reactor_Base.h: Made the following changes + + - Implemented read_notify_pipe (). Will read just one message from + the notify pipe or one message from the notification queue. + + - Reimplemented dispatch_notify (). This method would just + dispatch the upcall using the information in the notification + buffer. + + - The handle_input () now uses the read_notify_pipe () and + dispatch_notify () to achieve what it was doing before. + + - The notify () call now sends one notify message on the pipe + for every message in the notification queue. + + * ace/TP_Reactor.cpp (handle_socket_events): + * ace/TP_Reactor.h: + * ace/TP_Reactor.i: We had a race condition. The race condition + was because two threads were trying to read from the notify_pipe + at the same instance. This race condition was fixed by adding a + call to read_notify_pipe () with the lock held and then calling + dispatch_notify () with the buffer read after releasing the + lock. Did the following minor modifications + + - Changed dispatch_socket_events () as dispatch_socket_event () + as we were dispatching only one event. + + - We dont grab the token in the constructor of the token. We + have to make a call specfically to grab_token () to get the + token. + + The above checkins should fix the correctness of the reactor + problems that we have been seeing. + Fri Aug 31 18:30:28 2001 Krishnakumar B <kitty@cs.wustl.edu> * bin/auto_run_tests.lst: diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a index 2b6981c5cd5..08a9ae8da47 100644 --- a/ChangeLogs/ChangeLog-02a +++ b/ChangeLogs/ChangeLog-02a @@ -1,3 +1,44 @@ +Fri Aug 31 19:14:52 2001 Balachandran Natarajan <bala@cs.wustl.edu> + + * ace/Reactor_Impl.h: Added a new method by name read_notify_pipe + (). Also changed the dispatch_notify () to take in a + Notification buffer instead of a ACE_HANDLE. + + * ace/Select_Reactor_Base.cpp: + * ace/Select_Reactor_Base.h: Made the following changes + + - Implemented read_notify_pipe (). Will read just one message from + the notify pipe or one message from the notification queue. + + - Reimplemented dispatch_notify (). This method would just + dispatch the upcall using the information in the notification + buffer. + + - The handle_input () now uses the read_notify_pipe () and + dispatch_notify () to achieve what it was doing before. + + - The notify () call now sends one notify message on the pipe + for every message in the notification queue. + + * ace/TP_Reactor.cpp (handle_socket_events): + * ace/TP_Reactor.h: + * ace/TP_Reactor.i: We had a race condition. The race condition + was because two threads were trying to read from the notify_pipe + at the same instance. This race condition was fixed by adding a + call to read_notify_pipe () with the lock held and then calling + dispatch_notify () with the buffer read after releasing the + lock. Did the following minor modifications + + - Changed dispatch_socket_events () as dispatch_socket_event () + as we were dispatching only one event. + + - We dont grab the token in the constructor of the token. We + have to make a call specfically to grab_token () to get the + token. + + The above checkins should fix the correctness of the reactor + problems that we have been seeing. + Fri Aug 31 18:30:28 2001 Krishnakumar B <kitty@cs.wustl.edu> * bin/auto_run_tests.lst: diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a index 2b6981c5cd5..08a9ae8da47 100644 --- a/ChangeLogs/ChangeLog-03a +++ b/ChangeLogs/ChangeLog-03a @@ -1,3 +1,44 @@ +Fri Aug 31 19:14:52 2001 Balachandran Natarajan <bala@cs.wustl.edu> + + * ace/Reactor_Impl.h: Added a new method by name read_notify_pipe + (). Also changed the dispatch_notify () to take in a + Notification buffer instead of a ACE_HANDLE. + + * ace/Select_Reactor_Base.cpp: + * ace/Select_Reactor_Base.h: Made the following changes + + - Implemented read_notify_pipe (). Will read just one message from + the notify pipe or one message from the notification queue. + + - Reimplemented dispatch_notify (). This method would just + dispatch the upcall using the information in the notification + buffer. + + - The handle_input () now uses the read_notify_pipe () and + dispatch_notify () to achieve what it was doing before. + + - The notify () call now sends one notify message on the pipe + for every message in the notification queue. + + * ace/TP_Reactor.cpp (handle_socket_events): + * ace/TP_Reactor.h: + * ace/TP_Reactor.i: We had a race condition. The race condition + was because two threads were trying to read from the notify_pipe + at the same instance. This race condition was fixed by adding a + call to read_notify_pipe () with the lock held and then calling + dispatch_notify () with the buffer read after releasing the + lock. Did the following minor modifications + + - Changed dispatch_socket_events () as dispatch_socket_event () + as we were dispatching only one event. + + - We dont grab the token in the constructor of the token. We + have to make a call specfically to grab_token () to get the + token. + + The above checkins should fix the correctness of the reactor + problems that we have been seeing. + Fri Aug 31 18:30:28 2001 Krishnakumar B <kitty@cs.wustl.edu> * bin/auto_run_tests.lst: diff --git a/ace/Reactor_Impl.h b/ace/Reactor_Impl.h index 9a279e1dea8..b64da0fa527 100644 --- a/ace/Reactor_Impl.h +++ b/ace/Reactor_Impl.h @@ -76,8 +76,13 @@ public: /// Handle one of the notify call on the <handle>. This could be /// because of a thread trying to unblock the <Reactor_Impl> - virtual int dispatch_notify (ACE_HANDLE handle) = 0; + virtual int dispatch_notify (ACE_Notification_Buffer &buffer) = 0; + /// Read one of the notify call on the <handle> into the + /// <buffer>. This could be because of a thread trying to unblock + /// the <Reactor_Impl> + virtual int read_notify_pipe (ACE_HANDLE handle, + ACE_Notification_Buffer &buffer) = 0; /** * Set the maximum number of times that the <handle_input> method * will iterate and dispatch the <ACE_Event_Handlers> that are diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp index 12402471eba..6eaecc133f3 100644 --- a/ace/Select_Reactor_Base.cpp +++ b/ace/Select_Reactor_Base.cpp @@ -684,8 +684,10 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); // No pending notifications. - if (this->notify_queue_.is_empty ()) - notification_required = 1; + + // We will send notify for every message.. + // if (this->notify_queue_.is_empty ()) + // notification_required = 1; ACE_Notification_Buffer *temp = 0; @@ -718,7 +720,8 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, if (notify_queue_.enqueue_tail (temp) == -1) return -1; - if (notification_required) + // Let us send a notify for every message + // if (notification_required) { ssize_t n = ACE::send (this->notification_pipe_.write_handle (), (char *) &buffer, @@ -782,11 +785,110 @@ ACE_Select_Reactor_Notify::notify_handle (void) // Thanks to Paul Stephenson for suggesting this approach. int -ACE_Select_Reactor_Notify::dispatch_notify (ACE_HANDLE handle) +ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) +{ + int result = 0; +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + // Dispatch all messages that are in the <notify_queue_>. + { + // We acquire the lock in a block to make sure we're not + // holding the lock while delivering callbacks... + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + ACE_Notification_Buffer *temp; + + if (notify_queue_.is_empty ()) + break; + else if (notify_queue_.dequeue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("dequeue_head")), + -1); + buffer = *temp; + if (free_queue_.enqueue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("enqueue_head")), + -1); + } + + // If eh == 0 then another thread is unblocking the + // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s + // internal structures. Otherwise, we need to dispatch the + // appropriate handle_* method on the <ACE_Event_Handler> + // pointer we've been passed. + if (buffer.eh_ != 0) + { + int result = 0; + + switch (buffer.mask_) + { + case ACE_Event_Handler::READ_MASK: + case ACE_Event_Handler::ACCEPT_MASK: + result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::WRITE_MASK: + result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::EXCEPT_MASK: + result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); + break; + default: + // Should we bail out if we get an invalid mask? + ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_)); + } + if (result == -1) + buffer.eh_->handle_close (ACE_INVALID_HANDLE, + ACE_Event_Handler::EXCEPT_MASK); + } +#else + // If eh == 0 then another thread is unblocking the + // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s + // internal structures. Otherwise, we need to dispatch the + // appropriate handle_* method on the <ACE_Event_Handler> + // pointer we've been passed. + if (buffer.eh_ != 0) + { + switch (buffer.mask_) + { + case ACE_Event_Handler::READ_MASK: + case ACE_Event_Handler::ACCEPT_MASK: + result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::WRITE_MASK: + result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::EXCEPT_MASK: + result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::QOS_MASK: + result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::GROUP_QOS_MASK: + result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE); + break; + default: + // Should we bail out if we get an invalid mask? + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("invalid mask = %d\n"), + buffer.mask_)); + } + if (result == -1) + buffer.eh_->handle_close (ACE_INVALID_HANDLE, + ACE_Event_Handler::EXCEPT_MASK); + } + +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + + return result; +} + +int +ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle, + ACE_Notification_Buffer &buffer) { ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notify"); - ACE_Notification_Buffer buffer; ssize_t n = 0; if ((n = ACE::recv (handle, (char *) &buffer, sizeof buffer)) > 0) @@ -806,102 +908,7 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_HANDLE handle) return -1; } -#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // Dispatch all messages that are in the <notify_queue_>. - for (;;) - { - { - // We acquire the lock in a block to make sure we're not - // holding the lock while delivering callbacks... - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - ACE_Notification_Buffer *temp; - - if (notify_queue_.is_empty ()) - break; - else if (notify_queue_.dequeue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - buffer = *temp; - if (free_queue_.enqueue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - } - - // If eh == 0 then another thread is unblocking the - // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s - // internal structures. Otherwise, we need to dispatch the - // appropriate handle_* method on the <ACE_Event_Handler> - // pointer we've been passed. - if (buffer.eh_ != 0) - { - int result = 0; - - switch (buffer.mask_) - { - case ACE_Event_Handler::READ_MASK: - case ACE_Event_Handler::ACCEPT_MASK: - result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::WRITE_MASK: - result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::EXCEPT_MASK: - result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); - break; - default: - // Should we bail out if we get an invalid mask? - ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_)); - } - if (result == -1) - buffer.eh_->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::EXCEPT_MASK); - } - } -#else - // If eh == 0 then another thread is unblocking the - // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s - // internal structures. Otherwise, we need to dispatch the - // appropriate handle_* method on the <ACE_Event_Handler> - // pointer we've been passed. - if (buffer.eh_ != 0) - { - int result = 0; - - switch (buffer.mask_) - { - case ACE_Event_Handler::READ_MASK: - case ACE_Event_Handler::ACCEPT_MASK: - result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::WRITE_MASK: - result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::EXCEPT_MASK: - result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::QOS_MASK: - result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::GROUP_QOS_MASK: - result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE); - break; - default: - // Should we bail out if we get an invalid mask? - ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("invalid mask = %d\n"), - buffer.mask_)); - } - if (result == -1) - buffer.eh_->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::EXCEPT_MASK); - } -#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ return 1; } @@ -922,9 +929,20 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) int number_dispatched = 0; int result = 0; - while ((result = this->dispatch_notify (handle) > 0)) + ACE_Notification_Buffer buffer; + + while ((result = this->read_notify_pipe (handle, + buffer) > 0)) { - number_dispatched++; + // Dispatch the buffer + if (this->dispatch_notify (buffer) > 0) + number_dispatched++; + else + { + // If there are any errors in dispatching... + result = -1; + break; + } // Bail out if we've reached the <notify_threshold_>. Note that // by default <notify_threshold_> is -1, so we'll loop until all diff --git a/ace/Select_Reactor_Base.h b/ace/Select_Reactor_Base.h index 9d60cb1f8aa..db34c2bd0aa 100644 --- a/ace/Select_Reactor_Base.h +++ b/ace/Select_Reactor_Base.h @@ -151,7 +151,13 @@ public: /// Handle one of the notify call on the <handle>. This could be /// because of a thread trying to unblock the <Reactor_Impl> - virtual int dispatch_notify (ACE_HANDLE handle); + virtual int dispatch_notify (ACE_Notification_Buffer &buffer); + + /// Read one of the notify call on the <handle> into the + /// <buffer>. This could be because of a thread trying to unblock + /// the <Reactor_Impl> + virtual int read_notify_pipe (ACE_HANDLE handle, + ACE_Notification_Buffer &buffer); /// Called back by the <ACE_Select_Reactor> when a thread wants to /// unblock us. @@ -193,7 +199,7 @@ public: /// Declare the dynamic allocation hooks. ACE_ALLOC_HOOK_DECLARE; -private: +protected: /** * Keep a back pointer to the <ACE_Select_Reactor>. If this value * if NULL then the <ACE_Select_Reactor> has been initialized with diff --git a/ace/TP_Reactor.cpp b/ace/TP_Reactor.cpp index 8b8fbc97c40..2fffb8b6b9a 100644 --- a/ace/TP_Reactor.cpp +++ b/ace/TP_Reactor.cpp @@ -4,7 +4,6 @@ #include "ace/TP_Reactor.h" #include "ace/Reactor.h" #include "ace/Thread.h" -#include "ace/Log_Msg.h" #if !defined (__ACE_INLINE__) #include "ace/TP_Reactor.i" @@ -56,9 +55,6 @@ ACE_TP_Token_Guard::grab_token (ACE_Time_Value *max_wait_time) return result; } -/************************************************************************/ -// Methods for ACE_TP_Reactor -/************************************************************************/ ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, @@ -80,23 +76,25 @@ ACE_TP_Reactor::ACE_TP_Reactor (size_t size, this->supress_notify_renew (1); } - - -void -ACE_TP_Reactor::max_notify_iterations (int /*iterations*/) +int +ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id) { - ACE_TRACE ("ACE_TP_Reactor::max_notify_iterations"); + ACE_TRACE ("ACE_TP_Reactor::owner"); + if (o_id) + *o_id = ACE_Thread::self (); + + return 0; - ACE_ERROR ((LM_ERROR, - "(%P|%t) This has no effect on the notify processing \n")); } int -ACE_TP_Reactor::max_notify_iterations (void) +ACE_TP_Reactor::owner (ACE_thread_t *t_id) { - ACE_TRACE ("ACE_TP_Reactor::max_notify_iterations"); + ACE_TRACE ("ACE_TP_Reactor::owner"); + *t_id = ACE_Thread::self (); return 0; + } @@ -105,42 +103,50 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) { ACE_TRACE ("ACE_TP_Reactor::handle_events"); - int result = 0; - // Stash the current time -- the destructor of this object will // automatically compute how much time elpased since this method was // called. ACE_Countdown_Time countdown (max_wait_time); + // The order of these events is very subtle, modify with care. + + // Instantiate the token guard which will try grabbing the token for // this thread. - ACE_TP_Token_Guard guard (this->token_, - max_wait_time, - result); + ACE_TP_Token_Guard guard (this->token_); + + + int result = guard.grab_token (max_wait_time); // If the guard is NOT the owner just return the retval if (!guard.is_owner ()) return result; + // After getting the lock just just for deactivation.. + if (this->deactivated_) + return -1; + // Update the countdown to reflect time waiting for the token. countdown.update (); - // After acquiring the lock, check if we have been deactivated. If - // we are deactivated, simply return without handling further - // events. - if (this->deactivated_) - { - return -1; - } + return this->dispatch_i (max_wait_time, + guard); +} - // We got the lock, lets handle some events. We collect the events - // that we need to handle. We release the token and then handle - // those events that needs handling. + +int +ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time, + ACE_TP_Token_Guard &guard) +{ int event_count = this->get_event_for_dispatching (max_wait_time); + int result = 0; + // Note: We are passing the <event_count> around, to have record of + // how many events still need processing. May be this could be + // useful in future. // Dispatch signals if (event_count == -1) @@ -161,14 +167,14 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) // handle timers early since they may have higher latency // constraints than I/O handlers. Ideally, the order of // dispatching should be a strategy... - int retval = this->handle_timer_events (event_count, - guard); + result = this->handle_timer_events (event_count, + guard); - if (retval > 0) - return retval; + if (result > 0) + return result; - // Else just fall through for further handling - } + // Else just fall through for further handling + } if (event_count > 0) @@ -176,128 +182,27 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) // Next dispatch the notification handlers (if there are any to // dispatch). These are required to handle multiple-threads that // are trying to update the <Reactor>. - int retval = this->handle_notify_events (event_count, - guard); + result = this->handle_notify_events (event_count, + guard); - if (retval > 0) - return retval; + if (result > 0) + return result; - // Else just fall through for further handling + // Else just fall through for further handling } - - if (event_count > 0) - { - // Handle socket events - return this->handle_socket_events (event_count, - guard); - } - - return 0; -} - -int -ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time) -{ - ACE_TRACE ("ACE_TP_Reactor::handle_events"); - return this->handle_events (&max_wait_time); -} - -int -ACE_TP_Reactor::resumable_handler (void) -{ - return 1; -} - -int -ACE_TP_Reactor::mask_ops (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - int ops) -{ - ACE_TRACE ("ACE_TP_Reactor::mask_ops"); - ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token, - ace_mon, this->token_, -1)); - - int result = 0; - - // If it looks like the handle isn't suspended, then - // set the ops on the wait_set_, otherwise set the suspend_set_. - - if (this->suspend_set_.rd_mask_.is_set (handle) == 0 - && this->suspend_set_.wr_mask_.is_set (handle) == 0 - && this->suspend_set_.ex_mask_.is_set (handle) == 0) - - result = this->bit_ops (handle, mask, - this->wait_set_, - ops); - else - - result = this->bit_ops (handle, mask, - this->suspend_set_, - ops); - - return result; -} - - -int -ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask, - int ops) -{ - ACE_TRACE ("ACE_TP_Reactor::mask_ops"); - return this->mask_ops (eh->get_handle (), mask, ops); -} - - -int -ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id) -{ - ACE_TRACE ("ACE_TP_Reactor::owner"); - if (o_id) - *o_id = ACE_Thread::self (); + if (event_count > 0) + { + // Handle socket events + return this->handle_socket_events (event_count, + guard); + } return 0; } -int -ACE_TP_Reactor::owner (ACE_thread_t *t_id) -{ - ACE_TRACE ("ACE_TP_Reactor::owner"); - *t_id = ACE_Thread::self (); - return 0; -} - - -int -ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time) -{ - ACE_TRACE ("ACE_TP_Reactor::get_event_for_dispatching"); - // If the reactor handler state has changed, clear any remembered - // ready bits and re-scan from the master wait_set. - if (this->state_changed_) - { - this->ready_set_.rd_mask_.reset (); - this->ready_set_.wr_mask_.reset (); - this->ready_set_.ex_mask_.reset (); - this->state_changed_ = 0; - } - else - { - // This is a hack... somewhere, under certain conditions (which - // I don't understand...) the mask will have all of its bits clear, - // yet have a size_ > 0. This is an attempt to remedy the affect, - // without knowing why it happens. - this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ()); - this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ()); - this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ()); - } - - return this->wait_for_multiple_events (this->ready_set_, - max_wait_time); -} int @@ -378,6 +283,7 @@ ACE_TP_Reactor::handle_timer_events (int &event_count, } + int ACE_TP_Reactor::handle_notify_events (int &event_count, ACE_TP_Token_Guard &guard) @@ -388,61 +294,83 @@ ACE_TP_Reactor::handle_notify_events (int &event_count, if (notify_handle != ACE_INVALID_HANDLE) { + // Clear the handle of the read_mask of our <ready_set_> this->ready_set_.rd_mask_.clr_bit (notify_handle); - // Decrement the number of events that needs handling yet. - event_count--; + // Now just do a read on the pipe.. + ACE_Notification_Buffer buffer; - // Release the token before dispatching notifies... - guard.release_token (); + if (this->notify_handler_->read_notify_pipe (notify_handle, + buffer) >= 0) + { + event_count--; + + // Release the token before dispatching notifies... + guard.release_token (); + + this->notify_handler_->dispatch_notify (buffer); + + this->renew (); + return 1; + } - // Dipatch and return - return - this->notify_handler_->dispatch_notify (notify_handle); + return 0; } return 0; } + int ACE_TP_Reactor::handle_socket_events (int &event_count, ACE_TP_Token_Guard &guard) { - ACE_TRACE ("ACE_TP_Reactor::handle_socket_events"); + // We got the lock, lets handle some events. Note: this method will + // *not* dispatch any I/O handlers. It will dispatch signals, + // timeouts, and notifications. ACE_EH_Dispatch_Info dispatch_info; - // Get the socket event dispatch information + this->get_socket_event_info (dispatch_info); + // If there is any event handler that is ready to be dispatched, the // dispatch information is recorded in dispatch_info. - int result = - this->get_socket_event_info (dispatch_info); - - - if (result) + if (dispatch_info.dispatch ()) { // Suspend the handler so that other threads don't start // dispatching it. - this->suspend_i (dispatch_info.handle_); + // Make sure we never suspend the notify_handler_ without holding + // the lock. + // @@ Actually, we don't even need to suspend the notify_handler_ + // here. But let me get it to work first. + if (dispatch_info.event_handler_ != this->notify_handler_) + this->suspend_i (dispatch_info.handle_); + } - // Decrement the number of events that needs handling yet. - event_count--; + /// Decrement the event left + --event_count; - // Release the token before dispatching notifies... - guard.release_token (); + // Release the lock. Others threads can start waiting. + guard.release_token (); - result = this->dispatch_socket_events (dispatch_info); + int result = 0; + // If there was an event handler ready, dispatch it. + if (dispatch_info.dispatch ()) + { + if (this->dispatch_socket_event (dispatch_info) == 0) + ++result; // Dispatched one more event int flag = 0; if (dispatch_info.event_handler_ != 0) { - flag = - dispatch_info.event_handler_->resume_handler (); + flag = + dispatch_info.event_handler_->resume_handler (); } if (dispatch_info.handle_ != ACE_INVALID_HANDLE && + dispatch_info.event_handler_ != this->notify_handler_ && flag == 0) this->resume_handler (dispatch_info.handle_); } @@ -450,67 +378,93 @@ ACE_TP_Reactor::handle_socket_events (int &event_count, return result; } +int +ACE_TP_Reactor::mask_ops (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + int ops) +{ + ACE_TRACE ("ACE_TP_Reactor::mask_ops"); + ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token, + ace_mon, this->token_, -1)); -ACE_HANDLE -ACE_TP_Reactor::get_notify_handle (void) + int result = 0; + + // If it looks like the handle isn't suspended, then + // set the ops on the wait_set_, otherwise set the suspend_set_. + + if (this->suspend_set_.rd_mask_.is_set (handle) == 0 + && this->suspend_set_.wr_mask_.is_set (handle) == 0 + && this->suspend_set_.ex_mask_.is_set (handle) == 0) + + result = this->bit_ops (handle, mask, + this->wait_set_, + ops); + else + + result = this->bit_ops (handle, mask, + this->suspend_set_, + ops); + + return result; +} + + + +int +ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time) { - // Call the notify handler to get a handle on which we would have a - // notify waiting - ACE_HANDLE read_handle = - this->notify_handler_->notify_handle (); - // Check whether the rd_mask has been set on that handle. If so - // return the handle. - if (read_handle != ACE_INVALID_HANDLE && - this->ready_set_.rd_mask_.is_set (read_handle)) + // If the reactor handler state has changed, clear any remembered + // ready bits and re-scan from the master wait_set. + if (this->state_changed_) { - return read_handle; + this->ready_set_.rd_mask_.reset (); + this->ready_set_.wr_mask_.reset (); + this->ready_set_.ex_mask_.reset (); + this->state_changed_ = 0; } + else + { + // This is a hack... somewhere, under certain conditions (which + // I don't understand...) the mask will have all of its bits clear, + // yet have a size_ > 0. This is an attempt to remedy the affect, + // without knowing why it happens. - return ACE_INVALID_HANDLE; -} - + //# if !(defined (__SUNPRO_CC) && (__SUNPRO_CC > 0x500)) + // SunCC seems to be having problems with this piece of code + // here. I am not sure why though. This works fine with other + // compilers. As we dont seem to understand when this piece of + // code is needed and as it creates problems for SunCC we will + // not compile this. Most of the tests in TAO seem to be happy + // without this in SunCC. + this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ()); + this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ()); + this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ()); + //# endif /* ! __SUNPRO_CC */ + } + return this->wait_for_multiple_events (this->ready_set_, + max_wait_time); +} int ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event) { + event.reset (); // Nothing to dispatch yet // Check for dispatch in write, except, read. Only catch one, but if // one is caught, be sure to clear the handle from each mask in case // there is more than one mask set for it. This would cause problems // if the handler is suspended for dispatching, but its set bit in // another part of ready_set_ kept it from being dispatched. + int found_io = 0; ACE_HANDLE handle; - // Look at the read masks - { - ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_); - - while ((handle = handle_iter ()) != ACE_INVALID_HANDLE) - { - if (this->is_suspended_i (handle)) - continue; - - // Remember this info - event.set (handle, - this->handler_rep_.find (handle), - ACE_Event_Handler::READ_MASK, - &ACE_Event_Handler::handle_input); - this->ready_set_.rd_mask_.clr_bit (handle); - this->ready_set_.wr_mask_.clr_bit (handle); - this->ready_set_.ex_mask_.clr_bit (handle); - return 1; - } - } - - // We havent found any rd_masks for processing yet, so look for - // write masks { ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_); - while ((handle = handle_iter ()) != ACE_INVALID_HANDLE) + while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE) { if (this->is_suspended_i (handle)) continue; @@ -523,45 +477,67 @@ ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event) this->ready_set_.wr_mask_.clr_bit (handle); this->ready_set_.ex_mask_.clr_bit (handle); this->ready_set_.rd_mask_.clr_bit (handle); - return 1; + found_io = 1; } } - // We havent found any rd_mask and wr_masks for processing yet, so - // look for ex_masks - { - ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_); + if (!found_io) + { + ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_); - while ((handle = handle_iter ()) != ACE_INVALID_HANDLE) - { - if (this->is_suspended_i (handle)) - continue; + while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE) + { + if (this->is_suspended_i (handle)) + continue; + + // Remember this info + event.set (handle, + this->handler_rep_.find (handle), + ACE_Event_Handler::EXCEPT_MASK, + &ACE_Event_Handler::handle_exception); + this->ready_set_.ex_mask_.clr_bit (handle); + this->ready_set_.wr_mask_.clr_bit (handle); + this->ready_set_.rd_mask_.clr_bit (handle); + found_io = 1; + } + } - // Remember this info - event.set (handle, - this->handler_rep_.find (handle), - ACE_Event_Handler::EXCEPT_MASK, - &ACE_Event_Handler::handle_exception); - this->ready_set_.ex_mask_.clr_bit (handle); - this->ready_set_.wr_mask_.clr_bit (handle); - this->ready_set_.rd_mask_.clr_bit (handle); - return 1; - } + if (!found_io) + { + ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_); + + while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE) + { + if (this->is_suspended_i (handle)) + continue; + + // Remember this info + event.set (handle, + this->handler_rep_.find (handle), + ACE_Event_Handler::READ_MASK, + &ACE_Event_Handler::handle_input); + this->ready_set_.rd_mask_.clr_bit (handle); + this->ready_set_.wr_mask_.clr_bit (handle); + this->ready_set_.ex_mask_.clr_bit (handle); + found_io = 1; + } } - // We didnt find any.. - return 0; + return found_io; } + + +// Dispatches a single event handler int -ACE_TP_Reactor::dispatch_socket_events (ACE_EH_Dispatch_Info &info) +ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info) { - ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_events"); + ACE_TRACE ("ACE_TP_Reactor::notify_handle"); - ACE_HANDLE handle = info.handle_; - ACE_Event_Handler *event_handler = info.event_handler_; - ACE_Reactor_Mask mask = info.mask_; - ACE_EH_PTMF callback = info.callback_; + ACE_HANDLE handle = dispatch_info.handle_; + ACE_Event_Handler *event_handler = dispatch_info.event_handler_; + ACE_Reactor_Mask mask = dispatch_info.mask_; + ACE_EH_PTMF callback = dispatch_info.callback_; // Check for removed handlers. if (event_handler == 0) @@ -583,17 +559,37 @@ ACE_TP_Reactor::dispatch_socket_events (ACE_EH_Dispatch_Info &info) this->remove_handler (handle, mask); // As the handler is no longer valid, invalidate the handle - info.event_handler_ = 0; - info.handle_ = ACE_INVALID_HANDLE; + dispatch_info.event_handler_ = 0; + dispatch_info.handle_ = ACE_INVALID_HANDLE; return retval; } + // assert (status >= 0); + return 0; +} + +int +ACE_TP_Reactor::resumable_handler (void) +{ return 1; } +ACE_INLINE int +ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time) +{ + return ACE_Select_Reactor::handle_events (max_wait_time); +} + +ACE_INLINE int +ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask, + int ops) +{ + return this->mask_ops (eh->get_handle (), mask, ops); +} -void +ACE_INLINE void ACE_TP_Reactor::notify_handle (ACE_HANDLE, ACE_Reactor_Mask, ACE_Handle_Set &, @@ -603,3 +599,30 @@ ACE_TP_Reactor::notify_handle (ACE_HANDLE, ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("ACE_TP_Reactor::notify_handle: Wrong version of notify_handle() gets called"))); } + +ACE_HANDLE +ACE_TP_Reactor::get_notify_handle (void) +{ + // Call the notify handler to get a handle on which we would have a + // notify waiting + ACE_HANDLE read_handle = + this->notify_handler_->notify_handle (); + + // Check whether the rd_mask has been set on that handle. If so + // return the handle. + // if (read_handle != ACE_INVALID_HANDLE && + //this->ready_set_.rd_mask_.is_set (read_handle)) + if (read_handle != ACE_INVALID_HANDLE) + { + ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_); + ACE_HANDLE handle = ACE_INVALID_HANDLE; + + while ((handle = handle_iter ()) == read_handle) + { + return read_handle; + } + } + + // None found.. + return ACE_INVALID_HANDLE; +} diff --git a/ace/TP_Reactor.h b/ace/TP_Reactor.h index b916aa81a22..83f2fbcfd4b 100644 --- a/ace/TP_Reactor.h +++ b/ace/TP_Reactor.h @@ -32,13 +32,12 @@ #include "ace/pre.h" #include "ace/Select_Reactor.h" - +#include "ace/Log_Msg.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ - /** * @class ACE_EH_Dispatch_Info * @@ -89,9 +88,7 @@ class ACE_Export ACE_TP_Token_Guard public: /// Constructor that will grab the token for us - ACE_TP_Token_Guard (ACE_Select_Reactor_Token &token, - ACE_Time_Value *max_wait_time, - int &result); + ACE_TP_Token_Guard (ACE_Select_Reactor_Token &token); /// Destructor. This will release the token if it hasnt been /// released till this point @@ -104,8 +101,6 @@ public: /// token or not. int is_owner (void); -private: - /// A helper method that grabs the token for us, after which the /// thread that owns that can do some actual work. int grab_token (ACE_Time_Value *max_wait_time); @@ -126,7 +121,6 @@ private: ACE_UNIMPLEMENTED_FUNC (ACE_TP_Token_Guard (void)) }; - /** * @class ACE_TP_Reactor * @@ -184,10 +178,6 @@ public: ACE_Timer_Queue * = 0, int mask_signals = 1); - // = Reactor calls - virtual void max_notify_iterations (int iter); - virtual int max_notify_iterations (void); - // = Event loop drivers. /** @@ -247,21 +237,31 @@ public: protected: // = Internal methods that do the actual work. + + /// Dispatch just 1 signal, timer, notification handlers + int dispatch_i (ACE_Time_Value *max_wait_time, + ACE_TP_Token_Guard &guard); + /// Get the event that needs dispatching.It could be either a /// signal, timer, notification handlers or return possibly 1 I/O /// handler for dispatching. In the most common use case, this would /// return 1 I/O handler for dispatching int get_event_for_dispatching (ACE_Time_Value *max_wait_time); + /// Method to handle signals + /// NOTE: It is just busted at this point in time. int handle_signals (int &event_count, ACE_TP_Token_Guard &g); + /// Handle timer events int handle_timer_events (int &event_count, ACE_TP_Token_Guard &g); + /// Handle notify events int handle_notify_events (int &event_count, ACE_TP_Token_Guard &g); + /// handle socket events int handle_socket_events (int &event_count, ACE_TP_Token_Guard &g); @@ -273,11 +273,16 @@ protected: ACE_EH_PTMF callback); private: + /// Get the handle of the notify pipe from the ready set if there is + /// an event in the notify pipe. ACE_HANDLE get_notify_handle (void); + /// Get socket event dispatch information. int get_socket_event_info (ACE_EH_Dispatch_Info &info); - int dispatch_socket_events (ACE_EH_Dispatch_Info &info); + /// Notify the appropriate <callback> in the context of the <eh> + /// associated with <handle> that a particular event has occurred. + int dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info); private: /// Deny access since member-wise won't work... diff --git a/ace/TP_Reactor.i b/ace/TP_Reactor.i index f169ddca174..9cbec199f80 100644 --- a/ace/TP_Reactor.i +++ b/ace/TP_Reactor.i @@ -48,14 +48,11 @@ ACE_EH_Dispatch_Info::dispatch (void) const /************************************************************************/ ACE_INLINE -ACE_TP_Token_Guard::ACE_TP_Token_Guard (ACE_Select_Reactor_Token &token, - ACE_Time_Value *max_wait_time, - int &result) +ACE_TP_Token_Guard::ACE_TP_Token_Guard (ACE_Select_Reactor_Token &token) + :token_ (token), owner_ (0) { - result = this->grab_token (max_wait_time); - } ACE_INLINE @@ -71,7 +68,7 @@ ACE_TP_Token_Guard::~ACE_TP_Token_Guard (void) ACE_INLINE void ACE_TP_Token_Guard::release_token (void) { - if (this->owner_ == 1) + if (this->owner_) { ACE_MT (this->token_.release ()); @@ -90,7 +87,6 @@ ACE_TP_Token_Guard::is_owner (void) /************************************************************************/ // Methods for ACE_TP_Reactor /************************************************************************/ - ACE_INLINE void ACE_TP_Reactor::no_op_sleep_hook (void *) { |