diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-02-17 01:02:17 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-02-17 01:02:17 +0000 |
commit | 83379a56bcf820a1b7b9974ec89c59915f1816d8 (patch) | |
tree | db2393337b28f0d06437fd2da365060a11a2e112 | |
parent | c685695d30d92cc376ab180641e5faf73cd2b8c1 (diff) | |
download | ATCD-83379a56bcf820a1b7b9974ec89c59915f1816d8.tar.gz |
foo
-rw-r--r-- | ChangeLog-97a | 74 | ||||
-rw-r--r-- | README | 2 | ||||
-rw-r--r-- | ace/ACE.cpp | 8 | ||||
-rw-r--r-- | ace/Log_Msg.h | 2 | ||||
-rw-r--r-- | ace/OS.i | 2 | ||||
-rw-r--r-- | ace/Reactor.cpp | 381 | ||||
-rw-r--r-- | ace/Reactor.h | 67 | ||||
-rw-r--r-- | ace/ReactorEx.cpp | 275 | ||||
-rw-r--r-- | ace/ReactorEx.h | 58 | ||||
-rw-r--r-- | ace/SOCK_Stream.h | 23 | ||||
-rw-r--r-- | ace/SOCK_Stream.i | 2 | ||||
-rw-r--r-- | ace/TTY_IO.cpp | 8 | ||||
-rw-r--r-- | apps/Gateway/Gateway/proxy_config | 4 | ||||
-rw-r--r-- | apps/Gateway/Gateway/svc.conf | 2 | ||||
-rw-r--r-- | examples/Reactor/Misc/test_signals_1.cpp | 23 | ||||
-rw-r--r-- | examples/Reactor/ReactorEx/test_MT.cpp | 15 | ||||
-rw-r--r-- | examples/Reactor/WFMO_Reactor/test_MT.cpp | 15 | ||||
-rw-r--r-- | tests/Future_Test.cpp | 1 | ||||
-rw-r--r-- | tests/Makefile | 1 | ||||
-rw-r--r-- | tests/Reactor_Notify_Test.cpp | 204 | ||||
-rw-r--r-- | tests/Reactors_Test.cpp | 11 | ||||
-rw-r--r-- | tests/Reader_Writer_Test.cpp | 4 | ||||
-rwxr-xr-x | tests/run_tests.sh | 1 |
23 files changed, 910 insertions, 273 deletions
diff --git a/ChangeLog-97a b/ChangeLog-97a index bff681bda09..ee2c167a18a 100644 --- a/ChangeLog-97a +++ b/ChangeLog-97a @@ -1,3 +1,77 @@ +Sun Feb 16 12:23:23 1997 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * ace/Reactor.cpp: Totally rewrote the ACE_Reactor's dispatching + mechanism so that it now keeps track of whether the state of the + wait_set_ has changed during a dispatch (i.e., whenever + register_handler_i() or remove_handler_i() is called). If the + wait_set_ state *has* changed, then we bail out and rerun + select() in order to get the latest changes. + + * ace/Reactor.cpp: Changed the implementation of the + ACE_Reactor_Notify class so that (1) it short-circuits a trip + through the ACE_Reactor::notify_handle() method (after all, it's + just going to call its own handle_input() method back) and (2) + the ACE_Reactor_Notify::handle_input() method now returns a + count of the number of handlers that it dispatched. + + * ace/Log_Msg.h: Added a (%P|%t) so that we now print out the + process id and thread number for failed ACE_ASSERT() calls. + + * tests: Removed the unnecessary template specializations of + ACE_Atomic_Op<ACE_Thread_Mutex, int> since this is already done + in libACE. + + * ace/Reactor.cpp: Removed the #ifdef preventing the enabling of + non-blocking mode for the recv() side of the Reactor's + notification pipe (socket) for Win32. I believe that with the + new max_notify_iterations scheme we should be all set. + + * ace/ReactorEx.cpp: Added an identical API for bounding the + max_notify_iterations() for ReactorEx. + + * ace/Reactor.cpp: Enhanced the Reactor's notify() mechanism so + that it is now possible to set the max_notify_iterations(), + which limits the number of times that the + ACE_Reactor_Notify::handle_input() method will iterate and + dispatch the ACE_Event_Handlers that are passed in via the + notify pipe before breaking out of its recv() loop. This is + necessary to keep from starving out other Event_Handlers. + Thanks to Rod Skinner <rods@in.ot.com.au> for pointing out the + need for this. + + * ace/Reactor.cpp: Fixed a bug in the WIN32 + ACE_Reactor_Notify::handle_input() logic. We were calling + requeue_position(0) when we should have been calling renew(). + +Sat Feb 15 11:46:39 1997 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * ace/ACE.cpp: It appears that VxWorks doesn't support fcntl(). + However, it does seem to support ioctl(). Therefore, that's how + we'll set the descriptors into non-blocking mode. Thanks to + Dave Mayerhoefer <m210229@svappl36.mdc.com> for reporting this. + + * ace/SOCK_Stream.h: Corrected the documentation for + ACE_SOCK_Stream::recv_n (void *buf, size_t len, int flags, const + ACE_Time_Value *timeout). Thanks to Paul Roman + <proman@npac.syr.edu> for reporting this. + + * ace/SOCK_Stream.i (recv_n): Fixed a minor bug in the + SOCK_Stream.i line 38: + + ACE_TRACE ("ACE_SOCK_Stream::send_n"); + + should be + + ACE_TRACE ("ACE_SOCK_Stream::recv_n"); + + Thanks to Paul Roman <proman@npac.syr.edu> for reporting this. + +Fri Feb 14 00:40:14 1997 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * ace/TTY_IO.cpp (control): Moved the drop out timer value from + the ACE struct to the WinNT struct. Thanks to Brad Flood + <BFLOOD@tcs.lmco.com> for this fix. + Thu Feb 13 21:24:17 1997 <irfan@cha-cha.cs.wustl.edu> * ace/OS.i (event_wait): Fixed the missing check for success. @@ -473,6 +473,8 @@ Michael Maxie <maxie@acm.org> John Cosby <John.D.Cosby@cpmx.saic.com> Nigel Owen <Nigel@voicelink.co.nz> Jorn Jensen <jornj@funcom.com> +Paul Roman <proman@npac.syr.edu> +Dave Mayerhoefer <m210229@svappl36.mdc.com> I would particularly like to thank Paul Stephenson, who worked with me at Ericsson and is now at ObjectSpace. Paul devised the recursive diff --git a/ace/ACE.cpp b/ace/ACE.cpp index 96246b2886c..79b6ccbba59 100644 --- a/ace/ACE.cpp +++ b/ace/ACE.cpp @@ -942,7 +942,7 @@ int ACE::set_flags (ACE_HANDLE handle, int flags) { ACE_TRACE ("ACE::set_flags"); -#if defined (ACE_WIN32) +#if defined (ACE_WIN32) || defined (VXWORKS) switch (flags) { case ACE_NONBLOCK: @@ -969,7 +969,7 @@ ACE::set_flags (ACE_HANDLE handle, int flags) return -1; else return 0; -#endif /* ACE_WIN32 */ +#endif /* ACE_WIN32 && VXWORKS */ } // Flags are the file status flags to turn off. @@ -979,7 +979,7 @@ ACE::clr_flags (ACE_HANDLE handle, int flags) { ACE_TRACE ("ACE::clr_flags"); -#if defined (ACE_WIN32) +#if defined (ACE_WIN32) || defined (VXWORKS) switch (flags) { case ACE_NONBLOCK: @@ -1006,7 +1006,7 @@ ACE::clr_flags (ACE_HANDLE handle, int flags) return -1; else return 0; -#endif /* ACE_WIN32 */ +#endif /* ACE_WIN32 || VXWORKS */ } int diff --git a/ace/Log_Msg.h b/ace/Log_Msg.h index 3dc046aed3e..09bbc6c328e 100644 --- a/ace/Log_Msg.h +++ b/ace/Log_Msg.h @@ -28,7 +28,7 @@ #define ACE_ASSERT(X) \ do { if(!(X)) { int __ace_error = errno; ACE_Log_Msg *ace___ = ACE_Log_Msg::instance (); \ ace___->set (__FILE__, __LINE__, -1, __ace_error, ace___->restart (), ace___->msg_ostream ()); \ - ace___->log (LM_ERROR, "ACE_ASSERT: file %N, line %l assertion failed for '%s'.%a\n", #X, -1); \ + ace___->log (LM_ERROR, "(%P|%t) ACE_ASSERT: file %N, line %l assertion failed for '%s'.%a\n", #X, -1); \ } } while (0) #endif /* ACE_NDEBUG */ @@ -5882,7 +5882,7 @@ ACE_OS::ioctl (ACE_HANDLE handle, int cmd, void *val) ACE_SOCKET sock = (ACE_SOCKET) handle; ACE_SOCKCALL_RETURN (::ioctlsocket (sock, cmd, (u_long *) val), int, -1); #elif defined (VXWORKS) - // this may not work very well... + // This may not work very well... ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::ioctl (handle, cmd, (int) val), ace_result_), int, -1); #else diff --git a/ace/Reactor.cpp b/ace/Reactor.cpp index e723529a776..d94e561d226 100644 --- a/ace/Reactor.cpp +++ b/ace/Reactor.cpp @@ -249,6 +249,11 @@ ACE_Reactor_Handler_Repository::bind (ACE_HANDLE handle, this->reactor_.wait_set_, ACE_Reactor::ADD_MASK); + // Note the fact that we've changed the state of the <wait_set_>, + // which is used by the dispatching loop to determine whether it can + // keep going or if it needs to reconsult select(). + this->reactor_.state_changed_ = 1; + // Assign *this* <Reactor> to the <Event_Handler>. event_handler->reactor (&this->reactor_); return 0; @@ -274,6 +279,11 @@ ACE_Reactor_Handler_Repository::unbind (ACE_HANDLE handle, this->reactor_.wait_set_, ACE_Reactor::CLR_MASK); + // Note the fact that we've changed the state of the <wait_set_>, + // which is used by the dispatching loop to determine whether it can + // keep going or if it needs to reconsult select(). + this->reactor_.state_changed_ = 1; + // Close down the <Event_Handler> unless we've been instructed not // to. if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0) @@ -483,7 +493,12 @@ ACE_Reactor::requeue_position (int rp) { ACE_TRACE ("ACE_Reactor::requeue_position"); ACE_MT (ACE_GUARD (ACE_REACTOR_MUTEX, ace_mon, this->token_)); +#if defined (ACE_WIN32) + // Must always requeue ourselves "next" on Win32. + this->requeue_position_ = 0; +#else this->requeue_position_ = rp; +#endif /* ACE_WIN32 */ } int @@ -494,6 +509,33 @@ ACE_Reactor::requeue_position (void) return this->requeue_position_; } +void +ACE_Reactor::max_notify_iterations (int iterations) +{ + ACE_TRACE ("ACE_Reactor::max_notify_iterations"); + ACE_MT (ACE_GUARD (ACE_REACTOR_MUTEX, ace_mon, this->token_)); + +#if defined (ACE_WIN32) + // There seems to be a Win32 bug with non-blocking mode, so we'll + // always just read one notification at a time. + iterations = 1; +#else + // Must always be > 0 or < 0 to optimize the loop exit condition. + if (iterations == 0) + iterations = 1; +#endif /* ACE_WIN32 */ + + this->max_notify_iterations_ = iterations; +} + +int +ACE_Reactor::max_notify_iterations (void) +{ + ACE_TRACE ("ACE_Reactor::max_notify_iterations"); + ACE_MT (ACE_GUARD_RETURN (ACE_REACTOR_MUTEX, ace_mon, this->token_, -1)); + return this->max_notify_iterations_; +} + #if defined (ACE_MT_SAFE) // Enqueue ourselves into the list of waiting threads. void @@ -553,13 +595,12 @@ ACE_Reactor_Notify::open (ACE_Reactor *r) if (this->notification_pipe_.open () == -1) return -1; -#if !defined (ACE_WIN32) // There seems to be a Win32 bug with this... - // Set this into non-blocking mode. + // There seems to be a Win32 bug with this... Set this into + // non-blocking mode. if (ACE::set_flags (this->notification_pipe_.read_handle (), ACE_NONBLOCK) == -1) return -1; else -#endif /* !ACE_WIN32 */ return this->reactor_->register_handler (this->notification_pipe_.read_handle (), this, @@ -593,7 +634,8 @@ ACE_Reactor_Notify::notify (ACE_Event_Handler *eh, // Reactor. int -ACE_Reactor_Notify::handle_notifications (const ACE_Handle_Set &rd_mask) +ACE_Reactor_Notify::dispatch_notifications (int &number_of_active_handles, + const ACE_Handle_Set &rd_mask) { ACE_TRACE ("ACE_Reactor_Notify::handle_notification"); @@ -602,13 +644,8 @@ ACE_Reactor_Notify::handle_notifications (const ACE_Handle_Set &rd_mask) if (rd_mask.is_set (read_handle)) { - this->reactor_->notify_handle - (read_handle, - ACE_Event_Handler::READ_MASK, - this->reactor_->ready_set_.rd_mask_, - this->reactor_->handler_rep_.find (read_handle), - &ACE_Event_Handler::handle_input); - return 1; + number_of_active_handles--; + return this->handle_input (read_handle); } else return 0; @@ -628,18 +665,9 @@ ACE_Reactor_Notify::handle_input (ACE_HANDLE handle) ACE_Notification_Buffer buffer; ssize_t n; + int number_dispatched = 0; -#if defined (ACE_WIN32) - n = ACE::recv (handle, (char *) &buffer, sizeof buffer); - - if (n == -1) - return -1; - - // Put ourselves at the head of the queue. - this->reactor_->requeue_position (0); -#else while ((n = ACE::recv (handle, (char *) &buffer, sizeof buffer)) != -1) -#endif /* ACE_WIN32 */ { // If eh == 0 then another thread is unblocking the ACE_Reactor // to update the ACE_Reactor's internal structures. Otherwise, @@ -669,16 +697,38 @@ ACE_Reactor_Notify::handle_input (ACE_HANDLE handle) buffer.eh_->handle_close (ACE_INVALID_HANDLE, ACE_Event_Handler::EXCEPT_MASK); } + + number_dispatched++; + + // Bail out if we've reached the <notify_threshold_>. Note that + // by default <notify_threshold_> is -1, so we'll loop until all + // the notifications in the pipe have been dispatched. + if (number_dispatched == this->reactor_->max_notify_iterations_) + break; } // Enqueue ourselves into the list of waiting threads. When we // reacquire the token we'll be off and running again with ownership - // of the token. + // of the token. The postcondition of this call is that + // this->reactor_.token_.current_owner () == ACE_Thread::self (); this->reactor_->renew (); - // Postcondition: this->reactor_.token_.current_owner () == - // ACE_Thread::self (); - return n == -1 && errno != EWOULDBLOCK ? -1 : 0; + if (n == -1) + { + if (errno != EWOULDBLOCK) + { + // If we're returning -1 here something is seriously wrong! + ACE_ASSERT (!"something's totally wrong!\n"); + return -1; + } + else + { + ACE_DEBUG ((LM_DEBUG, "(%t) ++++ WOULD BLOCK +++++\n")); + return number_dispatched; + } + } + else + return number_dispatched; } #endif /* ACE_MT_SAFE */ @@ -899,8 +949,15 @@ ACE_Reactor::ACE_Reactor (ACE_Sig_Handler *sh, timer_queue_ (0), delete_timer_queue_ (0), delete_signal_handler_ (0), +#if defined (ACE_WIN32) + requeue_position_ (0), // Must always requeue ourselves "next" on Win32. + max_notify_iterations_ (1), +#else requeue_position_ (-1), // Requeue at end of waiters by default. - initialized_ (0) + max_notify_iterations_ (-1), +#endif /* ACE_WIN32 */ + initialized_ (0), + state_changed_ (0) #if defined (ACE_MT_SAFE) , token_ (*this) #endif /* ACE_MT_SAFE */ @@ -921,8 +978,15 @@ ACE_Reactor::ACE_Reactor (size_t size, timer_queue_ (0), delete_timer_queue_ (0), delete_signal_handler_ (0), +#if defined (ACE_WIN32) + requeue_position_ (0), // Must always requeue ourselves "next" on Win32. + max_notify_iterations_ (1), +#else requeue_position_ (-1), // Requeue at end of waiters by default. - initialized_ (0) + max_notify_iterations_ (-1), +#endif /* ACE_WIN32 */ + initialized_ (0), + state_changed_ (0) #if defined (ACE_MT_SAFE) , token_ (*this) #endif /* ACE_MT_SAFE */ @@ -1328,6 +1392,121 @@ ACE_Reactor::wait_for_multiple_events (ACE_Reactor_Handle_Set &dispatch_set, } int +ACE_Reactor::dispatch_timer_handlers (void) +{ + int number_dispatched = this->timer_queue_->expire (); + return this->state_changed_ ? -1 : number_dispatched; +} + +int +ACE_Reactor::dispatch_notification_handlers (int &number_of_active_handles, + ACE_Reactor_Handle_Set &dispatch_set) +{ +#if defined (ACE_MT_SAFE) + // Check to see if the ACE_HANDLE associated with the Reactor's + // notify hook is enabled. If so, it means that one or more + // other threads are trying to update the ACE_Reactor's internal + // tables. We'll handle all these threads and then break out to + // continue the event loop. + + int number_dispatched = + this->notify_handler_.dispatch_notifications (number_of_active_handles, + dispatch_set.rd_mask_); + return this->state_changed_ ? -1 : number_dispatched; +#else + return 0; +#endif /* ACE_MT_SAFE */ +} + +int +ACE_Reactor::dispatch_output_handlers (int &number_of_active_handles, + ACE_Reactor_Handle_Set &dispatch_set) +{ + ACE_HANDLE handle; + + int number_dispatched = 0; + + if (number_of_active_handles > 0) + { + // Handle output events (this code needs to come first + // to handle the obscure case of piggy-backed data + // coming along with the final handshake message of a + // nonblocking connection). + + for (ACE_Handle_Set_Iterator handle_iter_wr (dispatch_set.wr_mask_); + (handle = handle_iter_wr ()) != ACE_INVALID_HANDLE + && number_dispatched < number_of_active_handles; + ++handle_iter_wr) + { + number_dispatched++; + this->notify_handle (handle, + ACE_Event_Handler::WRITE_MASK, + this->ready_set_.wr_mask_, + this->handler_rep_.find (handle), + &ACE_Event_Handler::handle_output); + } + } + + number_of_active_handles -= number_dispatched; + return this->state_changed_ ? -1 : number_dispatched; +} + +int +ACE_Reactor::dispatch_exception_handlers (int &number_of_active_handles, + ACE_Reactor_Handle_Set &dispatch_set) +{ + ACE_HANDLE handle; + + int number_dispatched = 0; + + if (number_of_active_handles > 0) + { + // Handle "exceptional" events. + for (ACE_Handle_Set_Iterator handle_iter_ex (dispatch_set.ex_mask_); + (handle = handle_iter_ex ()) != ACE_INVALID_HANDLE + && number_dispatched < number_of_active_handles; + ++handle_iter_ex) + { + this->notify_handle (handle, + ACE_Event_Handler::EXCEPT_MASK, + this->ready_set_.ex_mask_, + this->handler_rep_.find (handle), + &ACE_Event_Handler::handle_exception); + number_dispatched++; + } + } + + number_of_active_handles -= number_dispatched; + return this->state_changed_ ? -1 : number_dispatched; +} + +int +ACE_Reactor::dispatch_input_handlers (int &number_of_active_handles, + ACE_Reactor_Handle_Set &dispatch_set) +{ + ACE_HANDLE handle; + + int number_dispatched = 0; + + if (number_of_active_handles > 0) + { + // Handle input, passive connection, and shutdown events. + for (ACE_Handle_Set_Iterator handle_iter_rd (dispatch_set.rd_mask_); + (handle = handle_iter_rd ()) != ACE_INVALID_HANDLE + && number_dispatched < number_of_active_handles; + ++handle_iter_rd) + this->notify_handle (handle, + ACE_Event_Handler::READ_MASK, + this->ready_set_.rd_mask_, + this->handler_rep_.find (handle), + &ACE_Event_Handler::handle_input); + } + + number_of_active_handles -= number_dispatched; + return this->state_changed_ ? -1 : number_dispatched; +} + +int ACE_Reactor::dispatch (int number_of_active_handles, ACE_Reactor_Handle_Set &dispatch_set) { @@ -1335,90 +1514,94 @@ ACE_Reactor::dispatch (int number_of_active_handles, int number_of_handlers_dispatched = 0; - for (;;) + // Keep dispatching while there are still active handles left. Note + // that the only way we should ever iterate more than once through + // this loop is if signals occur while we're dispatching other + // handlers. + // + // Note that we keep track of changes to our state. If any of the + // dispatch_*() methods below return -1 it means that the + // <wait_set_> state has changed as the result of an + // <ACE_Event_Handler> being dispatched. This means that we need to + // bail out and rerun the select() loop since our existing notion of + // handles in <dispatch_set> may no longer be correct. + // + // In the beginning, our state starts out unchanged. After every + // iteration (i.e., due to signals), our state again starts out + // unchanged. + + for (this->state_changed_ = 0; + number_of_active_handles > 0; + this->state_changed_ = 0) { - number_of_handlers_dispatched += number_of_active_handles; - - // Handle timers first since they may have higher latency - // constraints... - - number_of_handlers_dispatched += this->timer_queue_->expire (); + // Perform the Template Method for dispatching all the handlers. + // We use a loop here just to simplify the "breaking out" if we + // need to stop early. -#if defined (ACE_MT_SAFE) - // Check to see if the notify ACE_HANDLE is enabled. If so, it - // means that one or more other threads are trying to update the - // ACE_Reactor's internal tables. We'll handle all these - // threads and then break out to continue the event loop. - - if (this->notify_handler_.handle_notifications (dispatch_set.rd_mask_) == 0) -#endif /* ACE_MT_SAFE */ + do { - ACE_HANDLE handle; - - if (number_of_active_handles > 0) - { - // Handle output events (this code needs to come first - // to handle the obscure case of piggy-backed data - // coming along with the final handshake message of a - // nonblocking connection). - - for (ACE_Handle_Set_Iterator handle_iter_wr (dispatch_set.wr_mask_); - (handle = handle_iter_wr ()) != ACE_INVALID_HANDLE - && --number_of_active_handles >= 0; - ++handle_iter_wr) - this->notify_handle (handle, - ACE_Event_Handler::WRITE_MASK, - this->ready_set_.wr_mask_, - this->handler_rep_.find (handle), - &ACE_Event_Handler::handle_output); - } - - if (number_of_active_handles > 0) - { - // Handle "exceptional" events. - for (ACE_Handle_Set_Iterator handle_iter_ex (dispatch_set.ex_mask_); - (handle = handle_iter_ex ()) != ACE_INVALID_HANDLE - && --number_of_active_handles >= 0; - ++handle_iter_ex) - this->notify_handle (handle, - ACE_Event_Handler::EXCEPT_MASK, - this->ready_set_.ex_mask_, - this->handler_rep_.find (handle), - &ACE_Event_Handler::handle_exception); - } - - if (number_of_active_handles > 0) - { - // Handle input, passive connection, and shutdown events. - for (ACE_Handle_Set_Iterator handle_iter_rd (dispatch_set.rd_mask_); - (handle = handle_iter_rd ()) != ACE_INVALID_HANDLE - && --number_of_active_handles >= 0; - ++handle_iter_rd) - this->notify_handle (handle, - ACE_Event_Handler::READ_MASK, - this->ready_set_.rd_mask_, - this->handler_rep_.find (handle), - &ACE_Event_Handler::handle_input); - } + int result; + + // Handle timers first since they may have higher latency + // constraints. + + result = this->dispatch_timer_handlers (); + if (result == -1) + break; // State has changed, exit inner loop. + else + number_of_handlers_dispatched += result; + + result = this->dispatch_notification_handlers (number_of_active_handles, + dispatch_set); + if (result == -1) + break; // State has changed, exit inner loop. + else + number_of_handlers_dispatched += result; + + result = this->dispatch_output_handlers (number_of_active_handles, + dispatch_set); + + if (result <= 0) + // State has changed or there are no more handles to + // dispatch, exit inner loop. + break; + else + number_of_handlers_dispatched += result; + + result = this->dispatch_input_handlers (number_of_active_handles, + dispatch_set); + + if (result <= 0) + // State has changed or there are no more handles to + // dispatch, exit inner loop. + break; + else + number_of_handlers_dispatched += result; + + result = this->dispatch_exception_handlers (number_of_active_handles, + dispatch_set); + + if (result <= 0) + // State has changed or there are no more handles to + // dispatch, exit inner loop. + break; + else + number_of_handlers_dispatched += result; } + while (0); // If any HANDLES are activated as a result of signals they // should be dispatched since they may be time critical... if (ACE_Sig_Handler::sig_pending () != 0) - { - ACE_Sig_Handler::sig_pending (0); + { + ACE_Sig_Handler::sig_pending (0); number_of_active_handles = this->any_ready (dispatch_set); - - if (number_of_active_handles > 0) - // Loop back around again and redispatch activated - // handlers. - continue; - } - - return number_of_handlers_dispatched; + } } + + return number_of_handlers_dispatched; } int diff --git a/ace/Reactor.h b/ace/Reactor.h index 6a04175a489..17a78fcab69 100644 --- a/ace/Reactor.h +++ b/ace/Reactor.h @@ -73,7 +73,8 @@ public: int open (ACE_Reactor *); int close (void); - int handle_notifications (const ACE_Handle_Set &rd_mask); + int dispatch_notifications (int &number_of_active_handles, + const ACE_Handle_Set &rd_mask); // Handles pending threads (if any) that are waiting to unblock the // Reactor. @@ -515,6 +516,22 @@ public: // Get position that the main ACE_Reactor thread is requeued in the // list of waiters during a notify() callback. + void max_notify_iterations (int); + // Set the maximum number of times that the + // <ACE_Reactor_Notify::handle_input> method will iterate and + // dispatch the <ACE_Event_Handlers> that are passed in via the + // notify pipe before breaking out of its <recv> loop. By default, + // this is set to -1, which means "iterate until the pipe is empty." + // Setting this to a value like "1 or 2" will increase "fairness" + // (and thus prevent starvation) at the expense of slightly higher + // dispatching overhead. + + int max_notify_iterations (void); + // Get the maximum number of times that the + // <ACE_Reactor_Notify::handle_input> method will iterate and + // dispatch the <ACE_Event_Handlers> that are passed in via the + // notify pipe before breaking out of its <recv> loop. + // = Low-level wait_set mask manipulation methods. virtual int mask_ops (ACE_Event_Handler *eh, ACE_Reactor_Mask mask, @@ -630,6 +647,8 @@ protected: ACE_Time_Value *); // Wait for events to occur. + // = Dispatching methods. + virtual int dispatch (int nfound, ACE_Reactor_Handle_Set &); // Template Method that dispatches <ACE_Event_Handler>s for time @@ -637,6 +656,35 @@ protected: // of <ACE_Event_Handler>s that were dispatched or -1 if something // goes wrong. + virtual int dispatch_timer_handlers (void); + // Dispatch any expired timer handlers. Returns -1 if the state of + // the <wait_set_> has changed, else returns number of timer + // handlers dispatched. + + virtual int dispatch_notification_handlers (int &number_of_active_handles, + ACE_Reactor_Handle_Set &dispatch_set); + // Dispatch any notification handlers. Returns -1 if the state of + // the <wait_set_> has changed, else returns number of handlers + // notified. + + virtual int dispatch_output_handlers (int &number_of_active_handles, + ACE_Reactor_Handle_Set &dispatch_set); + // Dispatch any output handlers. Returns -1 if the state of the + // <wait_set_> has changed, else returns number of handlers + // dispatched. + + virtual int dispatch_exception_handlers (int &number_of_active_handles, + ACE_Reactor_Handle_Set &dispatch_set); + // Dispatch any exception handlers. Returns -1 if the state of the + // <wait_set_> has changed, else returns number of handlers + // dispatched. + + virtual int dispatch_input_handlers (int &number_of_active_handles, + ACE_Reactor_Handle_Set &dispatch_set); + // Dispatch any input handlers. Returns -1 if the state of the + // <wait_set_> has changed, else returns number of handlers + // dispatched. + virtual void notify_handle (ACE_HANDLE handle, ACE_Reactor_Mask mask, ACE_Handle_Set &, @@ -666,8 +714,8 @@ protected: // Tracks handles that are waited for by select(). ACE_Reactor_Handle_Set ready_set_; - // Track handles we are interested for various events that must be - // dispatched *without* requiring select(). + // Track HANDLES we are interested in for various events that must + // be dispatched *without* going through select(). int restart_; // Restart automatically when interrupted @@ -679,12 +727,25 @@ protected: // requeued at the front of the list. Else if it's > 1 then that // indicates the number of waiters to skip over. + int max_notify_iterations_; + // Keeps track of the maximum number of times that the + // <ACE_Reactor_Notify::handle_input> method will iterate and + // dispatch the <ACE_Event_Handlers> that are passed in via the + // notify pipe before breaking out of its <recv> loop. By default, + // this is set to -1, which means "iterate until the pipe is empty." + int initialized_; // True if we've been initialized yet... ACE_thread_t owner_; // The original thread that created this Reactor. + int state_changed_; + // True if state has changed during dispatching of + // <ACE_Event_Handlers>, else false. This is used to determine + // whether we need to make another trip through the <Reactor>'s + // <wait_for_multiple_events> loop. + #if defined (ACE_MT_SAFE) ACE_Reactor_Notify notify_handler_; // Callback object that unblocks the ACE_Reactor if it's sleeping. diff --git a/ace/ReactorEx.cpp b/ace/ReactorEx.cpp index 541ee8b13c2..7b502036df8 100644 --- a/ace/ReactorEx.cpp +++ b/ace/ReactorEx.cpp @@ -187,14 +187,14 @@ ACE_ReactorEx_Handler_Repository::bind (ACE_HANDLE handle, } int -ACE_ReactorEx_Handler_Repository::changes_required () +ACE_ReactorEx_Handler_Repository::changes_required (void) { // Check if handles have be scheduled for additions or removal return (this->handles_to_be_added_ > 0) || (this->handles_to_be_deleted_ > 0); } int -ACE_ReactorEx_Handler_Repository::make_changes () +ACE_ReactorEx_Handler_Repository::make_changes (void) { // This method must ONLY be called by the // <ReactorEx->change_state_thread_>. We therefore assume that there @@ -211,7 +211,7 @@ ACE_ReactorEx_Handler_Repository::make_changes () } int -ACE_ReactorEx_Handler_Repository::handle_deletions () +ACE_ReactorEx_Handler_Repository::handle_deletions (void) { // This will help us in keeping track of the last valid index in the // handle arrays @@ -250,7 +250,7 @@ ACE_ReactorEx_Handler_Repository::handle_deletions () } int -ACE_ReactorEx_Handler_Repository::handle_additions () +ACE_ReactorEx_Handler_Repository::handle_additions (void) { // Go through the <to_be_added_*> arrays for (int i = 0; i < this->handles_to_be_added_; i++) @@ -388,7 +388,7 @@ ACE_ReactorEx::~ACE_ReactorEx (void) } void -ACE_ReactorEx::wakeup_all_threads () +ACE_ReactorEx::wakeup_all_threads (void) { this->wakeup_all_threads_.signal (); } @@ -513,8 +513,160 @@ ACE_ReactorEx::ok_to_wait (ACE_Time_Value *max_wait_time, return 1; } +ACE_HANDLE +ACE_ReactorEx_Notify::get_handle (void) const +{ + return this->wakeup_one_thread_.handle (); +} + +// Handle all pending notifications. + +int +ACE_ReactorEx_Notify::handle_signal (int signum, + siginfo_t *siginfo, + ucontext_t *) +{ + ACE_UNUSED_ARG (signum); + + // Just check for sanity... + if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ()) + return -1; + + for (int i = 1; ; i++) + { + ACE_Message_Block *mb = 0; + + if (this->message_queue_.dequeue_head + (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) + { + if (errno == EWOULDBLOCK) + // We've reached the end of the processing, return + // normally. + return 0; + else + return -1; // Something weird happened... + } + else + { + ACE_Notification_Buffer *buffer = + (ACE_Notification_Buffer *) mb->base (); + + // If eh == 0 then we've got major problems! Otherwise, we + // need to dispatch the appropriate handle_* method on the + // ACE_Event_Handler pointer we've been passed. + + if (buffer->eh_ != 0) + { + int result = 0; + + switch (buffer->mask_) + { + case ACE_Event_Handler::READ_MASK: + result = buffer->eh_->handle_input (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::WRITE_MASK: + result = buffer->eh_->handle_output (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::EXCEPT_MASK: + result = buffer->eh_->handle_exception (ACE_INVALID_HANDLE); + break; + default: + ACE_ERROR ((LM_ERROR, "invalid mask = %d\n", buffer->mask_)); + break; + } + if (result == -1) + buffer->eh_->handle_close (ACE_INVALID_HANDLE, + ACE_Event_Handler::EXCEPT_MASK); + } + + // Make sure to delete the memory regardless of success or + // failure! + mb->release (); + + // Bail out if we've reached the <notify_threshold_>. Note + // that by default <notify_threshold_> is -1, so we'll loop + // until we're done. + if (i == this->max_notify_iterations_) + break; + } + } +} + +// Notify the ReactorEx, potentially enqueueing the +// <ACE_Event_Handler> for subsequent processing in the ReactorEx +// thread of control. + int -ACE_ReactorEx::update_state () +ACE_ReactorEx_Notify::notify (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask, + ACE_Time_Value *timeout) +{ + if (eh != 0) + { + ACE_Message_Block *mb = 0; + ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof ACE_Notification_Buffer), -1); + + ACE_Notification_Buffer *buffer = + (ACE_Notification_Buffer *) mb->base (); + buffer->eh_ = eh; + buffer->mask_ = mask; + + // Convert from relative time to absolute time by adding the + // current time of day. This is what <ACE_Message_Queue> + // expects. + if (timeout != 0) + *timeout += ACE_OS::gettimeofday (); + + if (this->message_queue_.enqueue_tail + (mb, timeout) == -1) + { + mb->release (); + return -1; + } + } + + return this->wakeup_one_thread_.signal (); +} + +void +ACE_ReactorEx_Notify::max_notify_iterations (int iterations) +{ + ACE_TRACE ("ACE_ReactorEx_Notify::max_notify_iterations"); + // Must always be > 0 or < 0 to optimize the loop exit condition. + if (iterations == 0) + iterations = 1; + + this->max_notify_iterations_ = iterations; +} + +int +ACE_ReactorEx_Notify::max_notify_iterations (void) +{ + ACE_TRACE ("ACE_ReactorEx_Notify::max_notify_iterations"); + return this->max_notify_iterations_; +} + +void +ACE_ReactorEx::max_notify_iterations (int iterations) +{ + ACE_TRACE ("ACE_ReactorEx::max_notify_iterations"); + ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1); + + // Must always be > 0 or < 0 to optimize the loop exit condition. + this->notify_handler_.max_notify_iterations (iterations); +} + +int +ACE_ReactorEx::max_notify_iterations (void) +{ + ACE_TRACE ("ACE_ReactorEx::max_notify_iterations"); + ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1); + + return this->notify_handler_.max_notify_iterations (); +} + +int +ACE_ReactorEx::update_state (void) { // This GUARD is necessary since we are updating shared state. ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1); @@ -684,7 +836,8 @@ ACE_ReactorEx::dispatch_handler (int index) // ************************************************************ -ACE_ReactorEx_Notify::ACE_ReactorEx_Notify () +ACE_ReactorEx_Notify::ACE_ReactorEx_Notify (void) + : max_notify_iterations (-1) { } @@ -694,112 +847,4 @@ ACE_ReactorEx_Notify::open (ACE_ReactorEx &reactorEx) return reactorEx.register_handler (this); } -ACE_HANDLE -ACE_ReactorEx_Notify::get_handle (void) const -{ - return this->wakeup_one_thread_.handle (); -} - -// Handle all pending notifications. - -int -ACE_ReactorEx_Notify::handle_signal (int signum, - siginfo_t *siginfo, - ucontext_t *) -{ - ACE_UNUSED_ARG (signum); - - // Just check for sanity... - if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ()) - return -1; - - for (;;) - { - ACE_Message_Block *mb = 0; - - if (this->message_queue_.dequeue_head - (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) - { - if (errno == EWOULDBLOCK) - // We've reached the end of the processing, return - // normally. - return 0; - else - return -1; // Something weird happened... - } - else - { - ACE_Notification_Buffer *buffer = - (ACE_Notification_Buffer *) mb->base (); - - // If eh == 0 then we've got major problems! Otherwise, we - // need to dispatch the appropriate handle_* method on the - // ACE_Event_Handler pointer we've been passed. - - if (buffer->eh_ != 0) - { - int result = 0; - - switch (buffer->mask_) - { - case ACE_Event_Handler::READ_MASK: - result = buffer->eh_->handle_input (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::WRITE_MASK: - result = buffer->eh_->handle_output (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::EXCEPT_MASK: - result = buffer->eh_->handle_exception (ACE_INVALID_HANDLE); - break; - default: - ACE_ERROR ((LM_ERROR, "invalid mask = %d\n", buffer->mask_)); - break; - } - if (result == -1) - buffer->eh_->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::EXCEPT_MASK); - } - // Make sure to delete the memory regardless of success or - // failure! - mb->release (); - } - } -} - -// Notify the ReactorEx, potentially enqueueing the -// <ACE_Event_Handler> for subsequent processing in the ReactorEx -// thread of control. - -int -ACE_ReactorEx_Notify::notify (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask, - ACE_Time_Value *timeout) -{ - if (eh != 0) - { - ACE_Message_Block *mb = 0; - ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof ACE_Notification_Buffer), -1); - - ACE_Notification_Buffer *buffer = - (ACE_Notification_Buffer *) mb->base (); - buffer->eh_ = eh; - buffer->mask_ = mask; - - // Convert from relative time to absolute time by adding the - // current time of day. This is what <ACE_Message_Queue> - // expects. - if (timeout != 0) - *timeout += ACE_OS::gettimeofday (); - - if (this->message_queue_.enqueue_tail - (mb, timeout) == -1) - { - mb->release (); - return -1; - } - } - - return this->wakeup_one_thread_.signal (); -} - #endif /* ACE_WIN32 */ diff --git a/ace/ReactorEx.h b/ace/ReactorEx.h index 5c3040c32bf..dd7ce71a135 100644 --- a/ace/ReactorEx.h +++ b/ace/ReactorEx.h @@ -86,10 +86,10 @@ public: // Pointer to the beginning of the current array of // <ACE_Event_Handler> *'s. - virtual int changes_required (); + virtual int changes_required (void); // Check if changes to the handle set are required. - virtual int make_changes (); + virtual int make_changes (void); // Make changes to the handle set void dump (void) const; @@ -99,10 +99,10 @@ private: ACE_ReactorEx &reactorEx_; // Reference to our <ReactorEx>. - int handle_deletions (); + int handle_deletions (void); // Add handles to the handle set - int handle_additions (); + int handle_additions (void); // Remove handles from the handle set int remove_handler_i (size_t index, @@ -177,6 +177,24 @@ public: virtual ACE_HANDLE get_handle (void) const; // Returns a handle to the <ACE_Auto_Event>. + void max_notify_iterations (int); + // Set the maximum number of times that the + // <ACE_ReactorEx_Notify::handle_input> method will iterate and + // dispatch the <ACE_Event_Handlers> that are passed in via the + // notify queue before breaking out of its + // <ACE_Message_Queue::dequeue> loop. By default, this is set to + // -1, which means "iterate until the queue is empty." Setting this + // to a value like "1 or 2" will increase "fairness" (and thus + // prevent starvation) at the expense of slightly higher dispatching + // overhead. + + int max_notify_iterations (void); + // Get the maximum number of times that the + // <ACE_ReactorEx_Notify::handle_input> method will iterate and + // dispatch the <ACE_Event_Handlers> that are passed in via the + // notify queue before breaking out of its + // <ACE_Message_Queue::dequeue> loop. + private: virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); // Called when the notification event waited on by <ACE_ReactorEx> @@ -193,6 +211,14 @@ private: // Message queue that keeps track of pending <ACE_Event_Handlers>. // This queue must be thread-safe because it can be called by // multiple threads of control. + + int max_notify_iterations_; + // Keeps track of the maximum number of times that the + // <ACE_ReactorEx_Notify::handle_input> method will iterate and + // dispatch the <ACE_Event_Handlers> that are passed in via the + // notify queue before breaking out of its + // <ACE_Message_Queue::dequeue> loop. By default, this is set to + // -1, which means "iterate until the queue is empty." }; #if defined (ACE_WIN32) @@ -345,6 +371,24 @@ public: // 0, the caller will block until action is possible, else will wait // until the relative time specified in <timeout> elapses). + void max_notify_iterations (int); + // Set the maximum number of times that the + // <ACE_ReactorEx_Notify::handle_input> method will iterate and + // dispatch the <ACE_Event_Handlers> that are passed in via the + // notify queue before breaking out of its + // <ACE_Message_Queue::dequeue> loop. By default, this is set to + // -1, which means "iterate until the queue is empty." Setting this + // to a value like "1 or 2" will increase "fairness" (and thus + // prevent starvation) at the expense of slightly higher dispatching + // overhead. + + int max_notify_iterations (void); + // Get the maximum number of times that the + // <ACE_ReactorEx_Notify::handle_input> method will iterate and + // dispatch the <ACE_Event_Handlers> that are passed in via the + // notify queue before breaking out of its + // <ACE_Message_Queue::dequeue> loop. + void dump (void) const; // Dump the state of an object. @@ -360,7 +404,7 @@ public: protected: virtual int ok_to_wait (ACE_Time_Value *max_wait_time, int alertable); - // Check to see if it is ok to enter ::WaitForMultipleObjects() + // Check to see if it is ok to enter ::WaitForMultipleObjects(). virtual int wait_for_multiple_events (ACE_Time_Value *max_wait_time, int alertable); @@ -386,10 +430,10 @@ private: int calculate_timeout (ACE_Time_Value *time); // Used to caluculate the next timeout - int update_state (); + int update_state (void); // Update the state of the handler repository - void wakeup_all_threads (); + void wakeup_all_threads (void); // Wake up all threads in WaitForMultipleObjects so that they can // reconsult the handle set diff --git a/ace/SOCK_Stream.h b/ace/SOCK_Stream.h index 4111c01d12e..aa9bd905047 100644 --- a/ace/SOCK_Stream.h +++ b/ace/SOCK_Stream.h @@ -41,7 +41,9 @@ public: ssize_t recv_n (void *buf, int n, int flags) const; // Recv n bytes, keep trying until n are received. - ssize_t send_n (const void *buf, size_t len, int flags, + ssize_t send_n (const void *buf, + size_t len, + int flags, const ACE_Time_Value *timeout); // Try to send exactly <len> bytes into <buf> from <handle> (uses // the <send> call). If <send> blocks for longer than <timeout> the @@ -49,12 +51,19 @@ public: // If a timeout does not occur, <send_n> return <len> (i.e., the // number of bytes requested to be sent). - ssize_t recv_n (void *buf, size_t len, int flags, + ssize_t recv_n (void *buf, + size_t len, + int flags, const ACE_Time_Value *timeout); - // Wait up to <timeout> amount of time to receive up to <len> bytes - // into <buf> from <handle> (uses the <recv> call). If <recv> times - // out a -1 is returned with <errno == ETIME>. If it succeeds the - // number of bytes received is returned. + // Try to recv exactly <len> bytes into <buf> from <handle> (uses + // the <ACE::recv_n> call). The <ACE_Time_Value> indicates how long + // to blocking trying to receive. If <timeout> == 0, the caller + // will block until action is possible, else will wait until the + // relative time specified in *<timeout> elapses). If <recv> blocks + // for longer than <timeout> the number of bytes actually read is + // returned with <errno == ETIME>. If a timeout does not occur, + // <recv_n> return <len> (i.e., the number of bytes requested to be + // read). // = Send/receive an ``urgent'' character (see TCP specs...). ssize_t send_urg (void *ptr, int len = sizeof (char)); @@ -69,7 +78,7 @@ public: int close (void); // Close down the socket (we need this to make things work correctly // on Win32, which requires use to do a <close_writer> before doing - // the close in order to avoid losing data). + // the close to avoid losing data). // = Meta-type info typedef ACE_INET_Addr PEER_ADDR; diff --git a/ace/SOCK_Stream.i b/ace/SOCK_Stream.i index 6bf38fbaf26..d916914da4a 100644 --- a/ace/SOCK_Stream.i +++ b/ace/SOCK_Stream.i @@ -35,7 +35,7 @@ inline ssize_t ACE_SOCK_Stream::recv_n (void *buf, size_t len, int flags, const ACE_Time_Value *timeout) { - ACE_TRACE ("ACE_SOCK_Stream::send_n"); + ACE_TRACE ("ACE_SOCK_Stream::recv_n"); return ACE::recv_n (this->get_handle (), buf, len, flags, timeout); } diff --git a/ace/TTY_IO.cpp b/ace/TTY_IO.cpp index b0a2f3ec20b..c2cb94dcfcb 100644 --- a/ace/TTY_IO.cpp +++ b/ace/TTY_IO.cpp @@ -193,7 +193,13 @@ ACE_TTY_IO::control (Control_Mode cmd, dcb.fRtsControl = RTS_CONTROL_DISABLE ; } dcb.fBinary = TRUE ; - return ::SetCommState (this->get_handle (), &dcb); + ::SetCommState (this->get_handle (), &dcb); + + // 2/13/97 BWF added drop out timer + COMMTIMEOUTS timeouts; + ::GetCommTimeouts (this->get_handle(), &timeouts) ; + timeouts.ReadIntervalTimeout = arg->readtimeoutmsec ; + return ::SetCommTimeouts (this->get_handle (), &timeouts) ; case GETPARAMS: ACE_NOTSUP_RETURN (-1); // Not yet implemented. diff --git a/apps/Gateway/Gateway/proxy_config b/apps/Gateway/Gateway/proxy_config index fa4fcb13a91..ae3a26a7f07 100644 --- a/apps/Gateway/Gateway/proxy_config +++ b/apps/Gateway/Gateway/proxy_config @@ -34,8 +34,8 @@ # Proxy Host Remote Proxy Max Retry Local Priority # ID Port Role Timeout Port # ---- -------- ------ ------ ---------- ----- -------- - 1 tango.cs 10003 S 32 0 1 - 2 tango.cs 10002 C 32 0 1 + 1 tango.cs 10011 S 32 0 1 + 2 tango.cs 10010 C 32 0 1 # 3 mambo.cs 10002 C 32 0 1 # 4 lambada.cs 10002 C 32 0 1 # 5 lambada.cs 10002 C 32 0 1 diff --git a/apps/Gateway/Gateway/svc.conf b/apps/Gateway/Gateway/svc.conf index c822713287b..9ec4dc429dc 100644 --- a/apps/Gateway/Gateway/svc.conf +++ b/apps/Gateway/Gateway/svc.conf @@ -1,3 +1,3 @@ #static Svc_Manager "-d -p 2913" -dynamic Gateway Service_Object * ./Gateway:_make_Gateway() active "-d -c -P proxy_config -C consumer_config" +dynamic Gateway Service_Object * ./Gateway:_make_Gateway() active "-d -c -P proxy_config -C consumer_config -tOUTPUT_MT" diff --git a/examples/Reactor/Misc/test_signals_1.cpp b/examples/Reactor/Misc/test_signals_1.cpp index 710c07a3eff..fcb7ba8d05f 100644 --- a/examples/Reactor/Misc/test_signals_1.cpp +++ b/examples/Reactor/Misc/test_signals_1.cpp @@ -1,5 +1,13 @@ +// This simple program illustrates the difference between handling +// signals via the Reactor (which doesn't cause the event loop to +// terminate) and signals that aren't handled via the Reactor (which +// do). + #include "ace/Service_Config.h" +// Number of times to allow signal to execute until we quit. +static size_t count = 10; + static void my_signal_function (int sig) { @@ -13,7 +21,14 @@ public: siginfo_t *, ucontext_t *) { - ACE_DEBUG ((LM_DEBUG, "Executed ACE signal handler for sig %S\n", sig)); + ACE_DEBUG ((LM_DEBUG, "Executed ACE signal handler for sig %S, count = %d\n", + sig, count)); + + count--; + + if (count == 0) + ACE_Service_Config::end_reactor_event_loop (); + return 0; } @@ -54,10 +69,10 @@ main (int, char *argv[]) // This just executes the reactor events until my_handler tells us // we are finished. - ACE_DEBUG ((LM_DEBUG, "starting event loop\n")); + ACE_DEBUG ((LM_DEBUG, "starting event loop that runs until you've typed ^C a total of 10 times or ^\\ once.\n")); - int result = my_config.run_reactor_event_loop (); + while (my_config.reactor_event_loop_done () == 0) + my_config.run_reactor_event_loop (); - ACE_DEBUG ((LM_DEBUG, "result = %d\n", result)); return 0; } diff --git a/examples/Reactor/ReactorEx/test_MT.cpp b/examples/Reactor/ReactorEx/test_MT.cpp index 6a7975c67dc..f3c910705bc 100644 --- a/examples/Reactor/ReactorEx/test_MT.cpp +++ b/examples/Reactor/ReactorEx/test_MT.cpp @@ -66,8 +66,8 @@ parse_args (int argc, char **argv) class Task_Handler : public ACE_Task<ACE_NULL_SYNCH> { public: - Task_Handler (int number_of_handles, - int concurrent_threads); + Task_Handler (size_t number_of_handles, + size_t concurrent_threads); // Constructor. ~Task_Handler (void); @@ -79,7 +79,7 @@ public: int svc (void); // Task event loop. - int signal (int index); + int signal (size_t index); // Signal an event. private: @@ -96,11 +96,12 @@ Task_Handler::svc (void) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "handle_events"), 0); } -Task_Handler::Task_Handler (int number_of_handles, - int concurrent_threads) +Task_Handler::Task_Handler (size_t number_of_handles, + size_t int concurrent_threads) { ACE_NEW (this->events_, ACE_Auto_Event [number_of_handles]); - for (int i = 0; i < number_of_handles; i++) + + for (size_t i = 0; i < number_of_handles; i++) { if (ACE_Service_Config::reactorEx ()->register_handler (this, this->events_[i].handle ()) == -1) @@ -144,7 +145,7 @@ Task_Handler::handle_signal (int signum, siginfo_t *siginfo, ucontext_t *) } int -Task_Handler::signal (int index) +Task_Handler::signal (size_T index) { return this->events_[index].signal (); } diff --git a/examples/Reactor/WFMO_Reactor/test_MT.cpp b/examples/Reactor/WFMO_Reactor/test_MT.cpp index 6a7975c67dc..f3c910705bc 100644 --- a/examples/Reactor/WFMO_Reactor/test_MT.cpp +++ b/examples/Reactor/WFMO_Reactor/test_MT.cpp @@ -66,8 +66,8 @@ parse_args (int argc, char **argv) class Task_Handler : public ACE_Task<ACE_NULL_SYNCH> { public: - Task_Handler (int number_of_handles, - int concurrent_threads); + Task_Handler (size_t number_of_handles, + size_t concurrent_threads); // Constructor. ~Task_Handler (void); @@ -79,7 +79,7 @@ public: int svc (void); // Task event loop. - int signal (int index); + int signal (size_t index); // Signal an event. private: @@ -96,11 +96,12 @@ Task_Handler::svc (void) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "handle_events"), 0); } -Task_Handler::Task_Handler (int number_of_handles, - int concurrent_threads) +Task_Handler::Task_Handler (size_t number_of_handles, + size_t int concurrent_threads) { ACE_NEW (this->events_, ACE_Auto_Event [number_of_handles]); - for (int i = 0; i < number_of_handles; i++) + + for (size_t i = 0; i < number_of_handles; i++) { if (ACE_Service_Config::reactorEx ()->register_handler (this, this->events_[i].handle ()) == -1) @@ -144,7 +145,7 @@ Task_Handler::handle_signal (int signum, siginfo_t *siginfo, ucontext_t *) } int -Task_Handler::signal (int index) +Task_Handler::signal (size_T index) { return this->events_[index].signal (); } diff --git a/tests/Future_Test.cpp b/tests/Future_Test.cpp index 1195afb287d..c971d31df8f 100644 --- a/tests/Future_Test.cpp +++ b/tests/Future_Test.cpp @@ -284,7 +284,6 @@ Scheduler::work (u_long newparam, int newcount) static int n_loops = 100; #if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) -template class ACE_Atomic_Op<ACE_Thread_Mutex, int>; template class ACE_Future<const char *>; template class ACE_Future<u_long>; #endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ diff --git a/tests/Makefile b/tests/Makefile index e1e500bb8f6..4b1cbba8335 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -26,6 +26,7 @@ BIN = Barrier_Test \ Pipe_Test \ Reactors_Test \ Reactor_Exceptions_Test \ + Reactor_Notify_Test \ Reactor_Timer_Test \ Reader_Writer_Test \ Recursive_Mutex_Test \ diff --git a/tests/Reactor_Notify_Test.cpp b/tests/Reactor_Notify_Test.cpp new file mode 100644 index 00000000000..89bf1629801 --- /dev/null +++ b/tests/Reactor_Notify_Test.cpp @@ -0,0 +1,204 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// Reactors_Test.cpp +// +// = DESCRIPTION +// This is a test that illustrates how the Reactor's notify() +// method works under various settings of max_notify_iterations(). +// +// = AUTHOR +// Douglas C. Schmidt +// +// ============================================================================ + +#include "ace/Synch.h" +#include "ace/Service_Config.h" +#include "ace/Task.h" +#include "test_config.h" + +#if defined (ACE_HAS_THREADS) + +class Supplier_Task : public ACE_Task<ACE_MT_SYNCH> +{ +public: + Supplier_Task (void); + // Constructor. + + ~Supplier_Task (void); + // Destructor. + + virtual int open (void * = 0); + // Make this an Active Object. + + virtual int close (u_long); + // Close down the supplier. + + virtual int svc (void); + // Generates events and sends them to the <Reactor>'s <notify> + // method. + + virtual int handle_exception (ACE_HANDLE); + // Releases the <waiter_> semaphore when called by the <Reactor>'s + // notify handler. + + virtual int handle_output (ACE_HANDLE); + // Called every time through the main Reactor event loop to + // illustrate the difference between "limited" and "unlimited" + // notification. + +private: + ACE_Thread_Semaphore waiter_; + // Used to hand-shake between the <Supplier_Task> and the + // <Reactor>'s notify mechanism. + + ACE_Pipe pipe_; + // We use this pipe just so we can get a handle that is always + // "active." +}; + +Supplier_Task::Supplier_Task (void) + : waiter_ (0) // Make semaphore "locked" by default. +{ +} + +int +Supplier_Task::open (void *) +{ + if (this->pipe_.open () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "open failed"), -1); + else if (ACE_Service_Config::reactor ()->register_handler + (this->pipe_.write_handle (), this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler failed"), -1); + else + // Make this an Active Object. + return this->activate (THR_BOUND); +} + +int +Supplier_Task::close (u_long) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) Supplier_Task::close\n")); + + if (ACE_Service_Config::reactor ()->remove_handler + (this->pipe_.write_handle (), ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "remove_handler failed")); + return 0; +} + +Supplier_Task::~Supplier_Task (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) ~Supplier_Task\n")); +} + +int +Supplier_Task::svc (void) +{ + // ACE_NEW_THREAD; + + size_t i; + + ACE_DEBUG ((LM_DEBUG, "(%t) **** starting unlimited notifications test\n")); + + // Allow an unlimited number of iterations per + // <ACE_Reactor::notify>. + ACE_Service_Config::reactor ()->max_notify_iterations (-1); + + for (i = 0; i < ACE_MAX_ITERATIONS; i++) + { + ACE_DEBUG ((LM_DEBUG, "(%t) notifying reactor\n")); + // Notify the Reactor. + if (ACE_Service_Config::reactor ()->notify (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "notify"), -1); + + // Wait for our <handle_exception> method to release the + // semaphore. + else if (this->waiter_.acquire () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "acquire"), -1); + } + + ACE_DEBUG ((LM_DEBUG, "(%t) **** starting limited notifications test\n")); + + // Only allow 1 iteration per <ACE_Reactor::notify> + ACE_Service_Config::reactor ()->max_notify_iterations (1); + + for (i = 0; i < ACE_MAX_ITERATIONS; i++) + { + ACE_DEBUG ((LM_DEBUG, "(%t) notifying reactor\n")); + // Notify the Reactor. + if (ACE_Service_Config::reactor ()->notify (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "notify"), -1); + + // Wait for our <handle_exception> method to release the + // semaphore. + else if (this->waiter_.acquire () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "acquire"), -1); + } + + ACE_DEBUG ((LM_DEBUG, "(%t) **** exiting thread test\n")); + return 0; +} + +int +Supplier_Task::handle_exception (ACE_HANDLE handle) +{ + ACE_ASSERT (handle == ACE_INVALID_HANDLE); + ACE_DEBUG ((LM_DEBUG, "(%t) handle_exception\n")); + + this->waiter_.release (); + return 0; +} + +int +Supplier_Task::handle_output (ACE_HANDLE handle) +{ + ACE_ASSERT (handle == this->pipe_.write_handle ()); + ACE_DEBUG ((LM_DEBUG, "(%t) handle_output\n")); + return 0; +} + +#endif /* ACE_HAS_THREADS */ + +int +main (int, char *[]) +{ + // ACE_START_TEST ("Reactors_Test"); + +#if defined (ACE_HAS_THREADS) + ACE_Service_Config daemon; + ACE_ASSERT (ACE_LOG_MSG->op_status () != -1); + + Supplier_Task task; + ACE_ASSERT (ACE_LOG_MSG->op_status () != -1); + + if (task.open () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) open failed\n"), -1); + else + { + ACE_Time_Value timeout (2); + + for (;;) + { + // Use a timeout to inform the Reactor when to shutdown. + switch (ACE_Service_Config::reactor ()->handle_events (timeout)) + { + case -1: + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "reactor"), -1); + /* NOTREACHED */ + case 0: + ACE_ERROR_RETURN ((LM_ERROR, "(%t) timeout\n"), 0); + /* NOTREACHED */ + } + } + } +#else + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); +#endif /* ACE_HAS_THREADS */ + // ACE_END_TEST; + return 0; +} diff --git a/tests/Reactors_Test.cpp b/tests/Reactors_Test.cpp index 5a30488f5cd..18ad9f3e4d7 100644 --- a/tests/Reactors_Test.cpp +++ b/tests/Reactors_Test.cpp @@ -17,7 +17,6 @@ // // ============================================================================ -#include "ace/Reactor.h" #include "ace/Synch.h" #include "ace/Service_Config.h" #include "ace/Task.h" @@ -25,7 +24,6 @@ #if defined (ACE_HAS_THREADS) -static const int NUM_INVOCATIONS = 10; static const int MAX_TASKS = 20; class Test_Task : public ACE_Task<ACE_MT_SYNCH> @@ -100,7 +98,7 @@ Test_Task::svc (void) { ACE_NEW_THREAD; - for (int i = 0; i < NUM_INVOCATIONS; i++) + for (int i = 0; i < ACE_MAX_ITERATIONS; i++) { ACE_OS::thr_yield (); @@ -130,7 +128,7 @@ Test_Task::handle_input (ACE_HANDLE) { this->handled_++; - if (this->handled_ == NUM_INVOCATIONS) + if (this->handled_ == ACE_MAX_ITERATIONS) { done_count--; ACE_DEBUG ((LM_DEBUG, @@ -158,6 +156,7 @@ worker (void *args) for (;;) { + // Use a timeout to inform the Reactor when to shutdown. switch (reactor->handle_events (timeout)) { case -1: @@ -172,10 +171,6 @@ worker (void *args) return 0; } -#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) -template class ACE_Atomic_Op<ACE_Thread_Mutex, int>; -#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ - #endif /* ACE_HAS_THREADS */ int diff --git a/tests/Reader_Writer_Test.cpp b/tests/Reader_Writer_Test.cpp index cdd70035634..74c2a154c24 100644 --- a/tests/Reader_Writer_Test.cpp +++ b/tests/Reader_Writer_Test.cpp @@ -144,10 +144,6 @@ writer (void *) return 0; } -#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) -template class ACE_Atomic_Op<ACE_Thread_Mutex, int>; -#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ - #endif /* ACE_HAS_THREADS */ // Spawn off threads. diff --git a/tests/run_tests.sh b/tests/run_tests.sh index 08c90d9a463..135d7058855 100755 --- a/tests/run_tests.sh +++ b/tests/run_tests.sh @@ -60,6 +60,7 @@ run Thread_Pool_Test # uses Thread_Manager, Task run Future_Test # uses Thread_Manager, Task run Reactors_Test # uses Task, Mutex, Reactor run Reactor_Exceptions_Test # uses Reactor and C++ exceptions +run Reactor_Notify_Test # uses Reactor's notify() method, Task run Reactor_Timer_Test # uses Event_Handler, Reactor run Reader_Writer_Test # uses Thread_Manager, Mutex run SOCK_Test # uses Thread_Manager, SOCK_SAP |