summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1997-02-17 01:02:17 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1997-02-17 01:02:17 +0000
commit83379a56bcf820a1b7b9974ec89c59915f1816d8 (patch)
treedb2393337b28f0d06437fd2da365060a11a2e112
parentc685695d30d92cc376ab180641e5faf73cd2b8c1 (diff)
downloadATCD-83379a56bcf820a1b7b9974ec89c59915f1816d8.tar.gz
foo
-rw-r--r--ChangeLog-97a74
-rw-r--r--README2
-rw-r--r--ace/ACE.cpp8
-rw-r--r--ace/Log_Msg.h2
-rw-r--r--ace/OS.i2
-rw-r--r--ace/Reactor.cpp381
-rw-r--r--ace/Reactor.h67
-rw-r--r--ace/ReactorEx.cpp275
-rw-r--r--ace/ReactorEx.h58
-rw-r--r--ace/SOCK_Stream.h23
-rw-r--r--ace/SOCK_Stream.i2
-rw-r--r--ace/TTY_IO.cpp8
-rw-r--r--apps/Gateway/Gateway/proxy_config4
-rw-r--r--apps/Gateway/Gateway/svc.conf2
-rw-r--r--examples/Reactor/Misc/test_signals_1.cpp23
-rw-r--r--examples/Reactor/ReactorEx/test_MT.cpp15
-rw-r--r--examples/Reactor/WFMO_Reactor/test_MT.cpp15
-rw-r--r--tests/Future_Test.cpp1
-rw-r--r--tests/Makefile1
-rw-r--r--tests/Reactor_Notify_Test.cpp204
-rw-r--r--tests/Reactors_Test.cpp11
-rw-r--r--tests/Reader_Writer_Test.cpp4
-rwxr-xr-xtests/run_tests.sh1
23 files changed, 910 insertions, 273 deletions
diff --git a/ChangeLog-97a b/ChangeLog-97a
index bff681bda09..ee2c167a18a 100644
--- a/ChangeLog-97a
+++ b/ChangeLog-97a
@@ -1,3 +1,77 @@
+Sun Feb 16 12:23:23 1997 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * ace/Reactor.cpp: Totally rewrote the ACE_Reactor's dispatching
+ mechanism so that it now keeps track of whether the state of the
+ wait_set_ has changed during a dispatch (i.e., whenever
+ register_handler_i() or remove_handler_i() is called). If the
+ wait_set_ state *has* changed, then we bail out and rerun
+ select() in order to get the latest changes.
+
+ * ace/Reactor.cpp: Changed the implementation of the
+ ACE_Reactor_Notify class so that (1) it short-circuits a trip
+ through the ACE_Reactor::notify_handle() method (after all, it's
+ just going to call its own handle_input() method back) and (2)
+ the ACE_Reactor_Notify::handle_input() method now returns a
+ count of the number of handlers that it dispatched.
+
+ * ace/Log_Msg.h: Added a (%P|%t) so that we now print out the
+ process id and thread number for failed ACE_ASSERT() calls.
+
+ * tests: Removed the unnecessary template specializations of
+ ACE_Atomic_Op<ACE_Thread_Mutex, int> since this is already done
+ in libACE.
+
+ * ace/Reactor.cpp: Removed the #ifdef preventing the enabling of
+ non-blocking mode for the recv() side of the Reactor's
+ notification pipe (socket) for Win32. I believe that with the
+ new max_notify_iterations scheme we should be all set.
+
+ * ace/ReactorEx.cpp: Added an identical API for bounding the
+ max_notify_iterations() for ReactorEx.
+
+ * ace/Reactor.cpp: Enhanced the Reactor's notify() mechanism so
+ that it is now possible to set the max_notify_iterations(),
+ which limits the number of times that the
+ ACE_Reactor_Notify::handle_input() method will iterate and
+ dispatch the ACE_Event_Handlers that are passed in via the
+ notify pipe before breaking out of its recv() loop. This is
+ necessary to keep from starving out other Event_Handlers.
+ Thanks to Rod Skinner <rods@in.ot.com.au> for pointing out the
+ need for this.
+
+ * ace/Reactor.cpp: Fixed a bug in the WIN32
+ ACE_Reactor_Notify::handle_input() logic. We were calling
+ requeue_position(0) when we should have been calling renew().
+
+Sat Feb 15 11:46:39 1997 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * ace/ACE.cpp: It appears that VxWorks doesn't support fcntl().
+ However, it does seem to support ioctl(). Therefore, that's how
+ we'll set the descriptors into non-blocking mode. Thanks to
+ Dave Mayerhoefer <m210229@svappl36.mdc.com> for reporting this.
+
+ * ace/SOCK_Stream.h: Corrected the documentation for
+ ACE_SOCK_Stream::recv_n (void *buf, size_t len, int flags, const
+ ACE_Time_Value *timeout). Thanks to Paul Roman
+ <proman@npac.syr.edu> for reporting this.
+
+ * ace/SOCK_Stream.i (recv_n): Fixed a minor bug in the
+ SOCK_Stream.i line 38:
+
+ ACE_TRACE ("ACE_SOCK_Stream::send_n");
+
+ should be
+
+ ACE_TRACE ("ACE_SOCK_Stream::recv_n");
+
+ Thanks to Paul Roman <proman@npac.syr.edu> for reporting this.
+
+Fri Feb 14 00:40:14 1997 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * ace/TTY_IO.cpp (control): Moved the drop out timer value from
+ the ACE struct to the WinNT struct. Thanks to Brad Flood
+ <BFLOOD@tcs.lmco.com> for this fix.
+
Thu Feb 13 21:24:17 1997 <irfan@cha-cha.cs.wustl.edu>
* ace/OS.i (event_wait): Fixed the missing check for success.
diff --git a/README b/README
index 0b13abf784b..a62d16de945 100644
--- a/README
+++ b/README
@@ -473,6 +473,8 @@ Michael Maxie <maxie@acm.org>
John Cosby <John.D.Cosby@cpmx.saic.com>
Nigel Owen <Nigel@voicelink.co.nz>
Jorn Jensen <jornj@funcom.com>
+Paul Roman <proman@npac.syr.edu>
+Dave Mayerhoefer <m210229@svappl36.mdc.com>
I would particularly like to thank Paul Stephenson, who worked with me
at Ericsson and is now at ObjectSpace. Paul devised the recursive
diff --git a/ace/ACE.cpp b/ace/ACE.cpp
index 96246b2886c..79b6ccbba59 100644
--- a/ace/ACE.cpp
+++ b/ace/ACE.cpp
@@ -942,7 +942,7 @@ int
ACE::set_flags (ACE_HANDLE handle, int flags)
{
ACE_TRACE ("ACE::set_flags");
-#if defined (ACE_WIN32)
+#if defined (ACE_WIN32) || defined (VXWORKS)
switch (flags)
{
case ACE_NONBLOCK:
@@ -969,7 +969,7 @@ ACE::set_flags (ACE_HANDLE handle, int flags)
return -1;
else
return 0;
-#endif /* ACE_WIN32 */
+#endif /* ACE_WIN32 && VXWORKS */
}
// Flags are the file status flags to turn off.
@@ -979,7 +979,7 @@ ACE::clr_flags (ACE_HANDLE handle, int flags)
{
ACE_TRACE ("ACE::clr_flags");
-#if defined (ACE_WIN32)
+#if defined (ACE_WIN32) || defined (VXWORKS)
switch (flags)
{
case ACE_NONBLOCK:
@@ -1006,7 +1006,7 @@ ACE::clr_flags (ACE_HANDLE handle, int flags)
return -1;
else
return 0;
-#endif /* ACE_WIN32 */
+#endif /* ACE_WIN32 || VXWORKS */
}
int
diff --git a/ace/Log_Msg.h b/ace/Log_Msg.h
index 3dc046aed3e..09bbc6c328e 100644
--- a/ace/Log_Msg.h
+++ b/ace/Log_Msg.h
@@ -28,7 +28,7 @@
#define ACE_ASSERT(X) \
do { if(!(X)) { int __ace_error = errno; ACE_Log_Msg *ace___ = ACE_Log_Msg::instance (); \
ace___->set (__FILE__, __LINE__, -1, __ace_error, ace___->restart (), ace___->msg_ostream ()); \
- ace___->log (LM_ERROR, "ACE_ASSERT: file %N, line %l assertion failed for '%s'.%a\n", #X, -1); \
+ ace___->log (LM_ERROR, "(%P|%t) ACE_ASSERT: file %N, line %l assertion failed for '%s'.%a\n", #X, -1); \
} } while (0)
#endif /* ACE_NDEBUG */
diff --git a/ace/OS.i b/ace/OS.i
index 0644da3f697..b713ec01d4e 100644
--- a/ace/OS.i
+++ b/ace/OS.i
@@ -5882,7 +5882,7 @@ ACE_OS::ioctl (ACE_HANDLE handle, int cmd, void *val)
ACE_SOCKET sock = (ACE_SOCKET) handle;
ACE_SOCKCALL_RETURN (::ioctlsocket (sock, cmd, (u_long *) val), int, -1);
#elif defined (VXWORKS)
- // this may not work very well...
+ // This may not work very well...
ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::ioctl (handle, cmd, (int) val), ace_result_),
int, -1);
#else
diff --git a/ace/Reactor.cpp b/ace/Reactor.cpp
index e723529a776..d94e561d226 100644
--- a/ace/Reactor.cpp
+++ b/ace/Reactor.cpp
@@ -249,6 +249,11 @@ ACE_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
this->reactor_.wait_set_,
ACE_Reactor::ADD_MASK);
+ // Note the fact that we've changed the state of the <wait_set_>,
+ // which is used by the dispatching loop to determine whether it can
+ // keep going or if it needs to reconsult select().
+ this->reactor_.state_changed_ = 1;
+
// Assign *this* <Reactor> to the <Event_Handler>.
event_handler->reactor (&this->reactor_);
return 0;
@@ -274,6 +279,11 @@ ACE_Reactor_Handler_Repository::unbind (ACE_HANDLE handle,
this->reactor_.wait_set_,
ACE_Reactor::CLR_MASK);
+ // Note the fact that we've changed the state of the <wait_set_>,
+ // which is used by the dispatching loop to determine whether it can
+ // keep going or if it needs to reconsult select().
+ this->reactor_.state_changed_ = 1;
+
// Close down the <Event_Handler> unless we've been instructed not
// to.
if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
@@ -483,7 +493,12 @@ ACE_Reactor::requeue_position (int rp)
{
ACE_TRACE ("ACE_Reactor::requeue_position");
ACE_MT (ACE_GUARD (ACE_REACTOR_MUTEX, ace_mon, this->token_));
+#if defined (ACE_WIN32)
+ // Must always requeue ourselves "next" on Win32.
+ this->requeue_position_ = 0;
+#else
this->requeue_position_ = rp;
+#endif /* ACE_WIN32 */
}
int
@@ -494,6 +509,33 @@ ACE_Reactor::requeue_position (void)
return this->requeue_position_;
}
+void
+ACE_Reactor::max_notify_iterations (int iterations)
+{
+ ACE_TRACE ("ACE_Reactor::max_notify_iterations");
+ ACE_MT (ACE_GUARD (ACE_REACTOR_MUTEX, ace_mon, this->token_));
+
+#if defined (ACE_WIN32)
+ // There seems to be a Win32 bug with non-blocking mode, so we'll
+ // always just read one notification at a time.
+ iterations = 1;
+#else
+ // Must always be > 0 or < 0 to optimize the loop exit condition.
+ if (iterations == 0)
+ iterations = 1;
+#endif /* ACE_WIN32 */
+
+ this->max_notify_iterations_ = iterations;
+}
+
+int
+ACE_Reactor::max_notify_iterations (void)
+{
+ ACE_TRACE ("ACE_Reactor::max_notify_iterations");
+ ACE_MT (ACE_GUARD_RETURN (ACE_REACTOR_MUTEX, ace_mon, this->token_, -1));
+ return this->max_notify_iterations_;
+}
+
#if defined (ACE_MT_SAFE)
// Enqueue ourselves into the list of waiting threads.
void
@@ -553,13 +595,12 @@ ACE_Reactor_Notify::open (ACE_Reactor *r)
if (this->notification_pipe_.open () == -1)
return -1;
-#if !defined (ACE_WIN32) // There seems to be a Win32 bug with this...
- // Set this into non-blocking mode.
+ // There seems to be a Win32 bug with this... Set this into
+ // non-blocking mode.
if (ACE::set_flags (this->notification_pipe_.read_handle (),
ACE_NONBLOCK) == -1)
return -1;
else
-#endif /* !ACE_WIN32 */
return this->reactor_->register_handler
(this->notification_pipe_.read_handle (),
this,
@@ -593,7 +634,8 @@ ACE_Reactor_Notify::notify (ACE_Event_Handler *eh,
// Reactor.
int
-ACE_Reactor_Notify::handle_notifications (const ACE_Handle_Set &rd_mask)
+ACE_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
+ const ACE_Handle_Set &rd_mask)
{
ACE_TRACE ("ACE_Reactor_Notify::handle_notification");
@@ -602,13 +644,8 @@ ACE_Reactor_Notify::handle_notifications (const ACE_Handle_Set &rd_mask)
if (rd_mask.is_set (read_handle))
{
- this->reactor_->notify_handle
- (read_handle,
- ACE_Event_Handler::READ_MASK,
- this->reactor_->ready_set_.rd_mask_,
- this->reactor_->handler_rep_.find (read_handle),
- &ACE_Event_Handler::handle_input);
- return 1;
+ number_of_active_handles--;
+ return this->handle_input (read_handle);
}
else
return 0;
@@ -628,18 +665,9 @@ ACE_Reactor_Notify::handle_input (ACE_HANDLE handle)
ACE_Notification_Buffer buffer;
ssize_t n;
+ int number_dispatched = 0;
-#if defined (ACE_WIN32)
- n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
-
- if (n == -1)
- return -1;
-
- // Put ourselves at the head of the queue.
- this->reactor_->requeue_position (0);
-#else
while ((n = ACE::recv (handle, (char *) &buffer, sizeof buffer)) != -1)
-#endif /* ACE_WIN32 */
{
// If eh == 0 then another thread is unblocking the ACE_Reactor
// to update the ACE_Reactor's internal structures. Otherwise,
@@ -669,16 +697,38 @@ ACE_Reactor_Notify::handle_input (ACE_HANDLE handle)
buffer.eh_->handle_close (ACE_INVALID_HANDLE,
ACE_Event_Handler::EXCEPT_MASK);
}
+
+ number_dispatched++;
+
+ // Bail out if we've reached the <notify_threshold_>. Note that
+ // by default <notify_threshold_> is -1, so we'll loop until all
+ // the notifications in the pipe have been dispatched.
+ if (number_dispatched == this->reactor_->max_notify_iterations_)
+ break;
}
// Enqueue ourselves into the list of waiting threads. When we
// reacquire the token we'll be off and running again with ownership
- // of the token.
+ // of the token. The postcondition of this call is that
+ // this->reactor_.token_.current_owner () == ACE_Thread::self ();
this->reactor_->renew ();
- // Postcondition: this->reactor_.token_.current_owner () ==
- // ACE_Thread::self ();
- return n == -1 && errno != EWOULDBLOCK ? -1 : 0;
+ if (n == -1)
+ {
+ if (errno != EWOULDBLOCK)
+ {
+ // If we're returning -1 here something is seriously wrong!
+ ACE_ASSERT (!"something's totally wrong!\n");
+ return -1;
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) ++++ WOULD BLOCK +++++\n"));
+ return number_dispatched;
+ }
+ }
+ else
+ return number_dispatched;
}
#endif /* ACE_MT_SAFE */
@@ -899,8 +949,15 @@ ACE_Reactor::ACE_Reactor (ACE_Sig_Handler *sh,
timer_queue_ (0),
delete_timer_queue_ (0),
delete_signal_handler_ (0),
+#if defined (ACE_WIN32)
+ requeue_position_ (0), // Must always requeue ourselves "next" on Win32.
+ max_notify_iterations_ (1),
+#else
requeue_position_ (-1), // Requeue at end of waiters by default.
- initialized_ (0)
+ max_notify_iterations_ (-1),
+#endif /* ACE_WIN32 */
+ initialized_ (0),
+ state_changed_ (0)
#if defined (ACE_MT_SAFE)
, token_ (*this)
#endif /* ACE_MT_SAFE */
@@ -921,8 +978,15 @@ ACE_Reactor::ACE_Reactor (size_t size,
timer_queue_ (0),
delete_timer_queue_ (0),
delete_signal_handler_ (0),
+#if defined (ACE_WIN32)
+ requeue_position_ (0), // Must always requeue ourselves "next" on Win32.
+ max_notify_iterations_ (1),
+#else
requeue_position_ (-1), // Requeue at end of waiters by default.
- initialized_ (0)
+ max_notify_iterations_ (-1),
+#endif /* ACE_WIN32 */
+ initialized_ (0),
+ state_changed_ (0)
#if defined (ACE_MT_SAFE)
, token_ (*this)
#endif /* ACE_MT_SAFE */
@@ -1328,6 +1392,121 @@ ACE_Reactor::wait_for_multiple_events (ACE_Reactor_Handle_Set &dispatch_set,
}
int
+ACE_Reactor::dispatch_timer_handlers (void)
+{
+ int number_dispatched = this->timer_queue_->expire ();
+ return this->state_changed_ ? -1 : number_dispatched;
+}
+
+int
+ACE_Reactor::dispatch_notification_handlers (int &number_of_active_handles,
+ ACE_Reactor_Handle_Set &dispatch_set)
+{
+#if defined (ACE_MT_SAFE)
+ // Check to see if the ACE_HANDLE associated with the Reactor's
+ // notify hook is enabled. If so, it means that one or more
+ // other threads are trying to update the ACE_Reactor's internal
+ // tables. We'll handle all these threads and then break out to
+ // continue the event loop.
+
+ int number_dispatched =
+ this->notify_handler_.dispatch_notifications (number_of_active_handles,
+ dispatch_set.rd_mask_);
+ return this->state_changed_ ? -1 : number_dispatched;
+#else
+ return 0;
+#endif /* ACE_MT_SAFE */
+}
+
+int
+ACE_Reactor::dispatch_output_handlers (int &number_of_active_handles,
+ ACE_Reactor_Handle_Set &dispatch_set)
+{
+ ACE_HANDLE handle;
+
+ int number_dispatched = 0;
+
+ if (number_of_active_handles > 0)
+ {
+ // Handle output events (this code needs to come first
+ // to handle the obscure case of piggy-backed data
+ // coming along with the final handshake message of a
+ // nonblocking connection).
+
+ for (ACE_Handle_Set_Iterator handle_iter_wr (dispatch_set.wr_mask_);
+ (handle = handle_iter_wr ()) != ACE_INVALID_HANDLE
+ && number_dispatched < number_of_active_handles;
+ ++handle_iter_wr)
+ {
+ number_dispatched++;
+ this->notify_handle (handle,
+ ACE_Event_Handler::WRITE_MASK,
+ this->ready_set_.wr_mask_,
+ this->handler_rep_.find (handle),
+ &ACE_Event_Handler::handle_output);
+ }
+ }
+
+ number_of_active_handles -= number_dispatched;
+ return this->state_changed_ ? -1 : number_dispatched;
+}
+
+int
+ACE_Reactor::dispatch_exception_handlers (int &number_of_active_handles,
+ ACE_Reactor_Handle_Set &dispatch_set)
+{
+ ACE_HANDLE handle;
+
+ int number_dispatched = 0;
+
+ if (number_of_active_handles > 0)
+ {
+ // Handle "exceptional" events.
+ for (ACE_Handle_Set_Iterator handle_iter_ex (dispatch_set.ex_mask_);
+ (handle = handle_iter_ex ()) != ACE_INVALID_HANDLE
+ && number_dispatched < number_of_active_handles;
+ ++handle_iter_ex)
+ {
+ this->notify_handle (handle,
+ ACE_Event_Handler::EXCEPT_MASK,
+ this->ready_set_.ex_mask_,
+ this->handler_rep_.find (handle),
+ &ACE_Event_Handler::handle_exception);
+ number_dispatched++;
+ }
+ }
+
+ number_of_active_handles -= number_dispatched;
+ return this->state_changed_ ? -1 : number_dispatched;
+}
+
+int
+ACE_Reactor::dispatch_input_handlers (int &number_of_active_handles,
+ ACE_Reactor_Handle_Set &dispatch_set)
+{
+ ACE_HANDLE handle;
+
+ int number_dispatched = 0;
+
+ if (number_of_active_handles > 0)
+ {
+ // Handle input, passive connection, and shutdown events.
+ for (ACE_Handle_Set_Iterator handle_iter_rd (dispatch_set.rd_mask_);
+ (handle = handle_iter_rd ()) != ACE_INVALID_HANDLE
+ && number_dispatched < number_of_active_handles;
+ ++handle_iter_rd)
+ this->notify_handle (handle,
+ ACE_Event_Handler::READ_MASK,
+ this->ready_set_.rd_mask_,
+ this->handler_rep_.find (handle),
+ &ACE_Event_Handler::handle_input);
+ }
+
+ number_of_active_handles -= number_dispatched;
+ return this->state_changed_ ? -1 : number_dispatched;
+}
+
+int
ACE_Reactor::dispatch (int number_of_active_handles,
ACE_Reactor_Handle_Set &dispatch_set)
{
@@ -1335,90 +1514,94 @@ ACE_Reactor::dispatch (int number_of_active_handles,
int number_of_handlers_dispatched = 0;
- for (;;)
+ // Keep dispatching while there are still active handles left. Note
+ // that the only way we should ever iterate more than once through
+ // this loop is if signals occur while we're dispatching other
+ // handlers.
+ //
+ // Note that we keep track of changes to our state. If any of the
+ // dispatch_*() methods below return -1 it means that the
+ // <wait_set_> state has changed as the result of an
+ // <ACE_Event_Handler> being dispatched. This means that we need to
+ // bail out and rerun the select() loop since our existing notion of
+ // handles in <dispatch_set> may no longer be correct.
+ //
+ // In the beginning, our state starts out unchanged. After every
+ // iteration (i.e., due to signals), our state again starts out
+ // unchanged.
+
+ for (this->state_changed_ = 0;
+ number_of_active_handles > 0;
+ this->state_changed_ = 0)
{
- number_of_handlers_dispatched += number_of_active_handles;
-
- // Handle timers first since they may have higher latency
- // constraints...
-
- number_of_handlers_dispatched += this->timer_queue_->expire ();
+ // Perform the Template Method for dispatching all the handlers.
+ // We use a loop here just to simplify the "breaking out" if we
+ // need to stop early.
-#if defined (ACE_MT_SAFE)
- // Check to see if the notify ACE_HANDLE is enabled. If so, it
- // means that one or more other threads are trying to update the
- // ACE_Reactor's internal tables. We'll handle all these
- // threads and then break out to continue the event loop.
-
- if (this->notify_handler_.handle_notifications (dispatch_set.rd_mask_) == 0)
-#endif /* ACE_MT_SAFE */
+ do
{
- ACE_HANDLE handle;
-
- if (number_of_active_handles > 0)
- {
- // Handle output events (this code needs to come first
- // to handle the obscure case of piggy-backed data
- // coming along with the final handshake message of a
- // nonblocking connection).
-
- for (ACE_Handle_Set_Iterator handle_iter_wr (dispatch_set.wr_mask_);
- (handle = handle_iter_wr ()) != ACE_INVALID_HANDLE
- && --number_of_active_handles >= 0;
- ++handle_iter_wr)
- this->notify_handle (handle,
- ACE_Event_Handler::WRITE_MASK,
- this->ready_set_.wr_mask_,
- this->handler_rep_.find (handle),
- &ACE_Event_Handler::handle_output);
- }
-
- if (number_of_active_handles > 0)
- {
- // Handle "exceptional" events.
- for (ACE_Handle_Set_Iterator handle_iter_ex (dispatch_set.ex_mask_);
- (handle = handle_iter_ex ()) != ACE_INVALID_HANDLE
- && --number_of_active_handles >= 0;
- ++handle_iter_ex)
- this->notify_handle (handle,
- ACE_Event_Handler::EXCEPT_MASK,
- this->ready_set_.ex_mask_,
- this->handler_rep_.find (handle),
- &ACE_Event_Handler::handle_exception);
- }
-
- if (number_of_active_handles > 0)
- {
- // Handle input, passive connection, and shutdown events.
- for (ACE_Handle_Set_Iterator handle_iter_rd (dispatch_set.rd_mask_);
- (handle = handle_iter_rd ()) != ACE_INVALID_HANDLE
- && --number_of_active_handles >= 0;
- ++handle_iter_rd)
- this->notify_handle (handle,
- ACE_Event_Handler::READ_MASK,
- this->ready_set_.rd_mask_,
- this->handler_rep_.find (handle),
- &ACE_Event_Handler::handle_input);
- }
+ int result;
+
+ // Handle timers first since they may have higher latency
+ // constraints.
+
+ result = this->dispatch_timer_handlers ();
+ if (result == -1)
+ break; // State has changed, exit inner loop.
+ else
+ number_of_handlers_dispatched += result;
+
+ result = this->dispatch_notification_handlers (number_of_active_handles,
+ dispatch_set);
+ if (result == -1)
+ break; // State has changed, exit inner loop.
+ else
+ number_of_handlers_dispatched += result;
+
+ result = this->dispatch_output_handlers (number_of_active_handles,
+ dispatch_set);
+
+ if (result <= 0)
+ // State has changed or there are no more handles to
+ // dispatch, exit inner loop.
+ break;
+ else
+ number_of_handlers_dispatched += result;
+
+ result = this->dispatch_input_handlers (number_of_active_handles,
+ dispatch_set);
+
+ if (result <= 0)
+ // State has changed or there are no more handles to
+ // dispatch, exit inner loop.
+ break;
+ else
+ number_of_handlers_dispatched += result;
+
+ result = this->dispatch_exception_handlers (number_of_active_handles,
+ dispatch_set);
+
+ if (result <= 0)
+ // State has changed or there are no more handles to
+ // dispatch, exit inner loop.
+ break;
+ else
+ number_of_handlers_dispatched += result;
}
+ while (0);
// If any HANDLES are activated as a result of signals they
// should be dispatched since they may be time critical...
if (ACE_Sig_Handler::sig_pending () != 0)
- {
- ACE_Sig_Handler::sig_pending (0);
+ {
+ ACE_Sig_Handler::sig_pending (0);
number_of_active_handles = this->any_ready (dispatch_set);
-
- if (number_of_active_handles > 0)
- // Loop back around again and redispatch activated
- // handlers.
- continue;
- }
-
- return number_of_handlers_dispatched;
+ }
}
+
+ return number_of_handlers_dispatched;
}
int
diff --git a/ace/Reactor.h b/ace/Reactor.h
index 6a04175a489..17a78fcab69 100644
--- a/ace/Reactor.h
+++ b/ace/Reactor.h
@@ -73,7 +73,8 @@ public:
int open (ACE_Reactor *);
int close (void);
- int handle_notifications (const ACE_Handle_Set &rd_mask);
+ int dispatch_notifications (int &number_of_active_handles,
+ const ACE_Handle_Set &rd_mask);
// Handles pending threads (if any) that are waiting to unblock the
// Reactor.
@@ -515,6 +516,22 @@ public:
// Get position that the main ACE_Reactor thread is requeued in the
// list of waiters during a notify() callback.
+ void max_notify_iterations (int);
+ // Set the maximum number of times that the
+ // <ACE_Reactor_Notify::handle_input> method will iterate and
+ // dispatch the <ACE_Event_Handlers> that are passed in via the
+ // notify pipe before breaking out of its <recv> loop. By default,
+ // this is set to -1, which means "iterate until the pipe is empty."
+ // Setting this to a value like "1 or 2" will increase "fairness"
+ // (and thus prevent starvation) at the expense of slightly higher
+ // dispatching overhead.
+
+ int max_notify_iterations (void);
+ // Get the maximum number of times that the
+ // <ACE_Reactor_Notify::handle_input> method will iterate and
+ // dispatch the <ACE_Event_Handlers> that are passed in via the
+ // notify pipe before breaking out of its <recv> loop.
+
// = Low-level wait_set mask manipulation methods.
virtual int mask_ops (ACE_Event_Handler *eh,
ACE_Reactor_Mask mask,
@@ -630,6 +647,8 @@ protected:
ACE_Time_Value *);
// Wait for events to occur.
+ // = Dispatching methods.
+
virtual int dispatch (int nfound,
ACE_Reactor_Handle_Set &);
// Template Method that dispatches <ACE_Event_Handler>s for time
@@ -637,6 +656,35 @@ protected:
// of <ACE_Event_Handler>s that were dispatched or -1 if something
// goes wrong.
+ virtual int dispatch_timer_handlers (void);
+ // Dispatch any expired timer handlers. Returns -1 if the state of
+ // the <wait_set_> has changed, else returns number of timer
+ // handlers dispatched.
+
+ virtual int dispatch_notification_handlers (int &number_of_active_handles,
+ ACE_Reactor_Handle_Set &dispatch_set);
+ // Dispatch any notification handlers. Returns -1 if the state of
+ // the <wait_set_> has changed, else returns number of handlers
+ // notified.
+
+ virtual int dispatch_output_handlers (int &number_of_active_handles,
+ ACE_Reactor_Handle_Set &dispatch_set);
+ // Dispatch any output handlers. Returns -1 if the state of the
+ // <wait_set_> has changed, else returns number of handlers
+ // dispatched.
+
+ virtual int dispatch_exception_handlers (int &number_of_active_handles,
+ ACE_Reactor_Handle_Set &dispatch_set);
+ // Dispatch any exception handlers. Returns -1 if the state of the
+ // <wait_set_> has changed, else returns number of handlers
+ // dispatched.
+
+ virtual int dispatch_input_handlers (int &number_of_active_handles,
+ ACE_Reactor_Handle_Set &dispatch_set);
+ // Dispatch any input handlers. Returns -1 if the state of the
+ // <wait_set_> has changed, else returns number of handlers
+ // dispatched.
+
virtual void notify_handle (ACE_HANDLE handle,
ACE_Reactor_Mask mask,
ACE_Handle_Set &,
@@ -666,8 +714,8 @@ protected:
// Tracks handles that are waited for by select().
ACE_Reactor_Handle_Set ready_set_;
- // Track handles we are interested for various events that must be
- // dispatched *without* requiring select().
+ // Track HANDLES we are interested in for various events that must
+ // be dispatched *without* going through select().
int restart_;
// Restart automatically when interrupted
@@ -679,12 +727,25 @@ protected:
// requeued at the front of the list. Else if it's > 1 then that
// indicates the number of waiters to skip over.
+ int max_notify_iterations_;
+ // Keeps track of the maximum number of times that the
+ // <ACE_Reactor_Notify::handle_input> method will iterate and
+ // dispatch the <ACE_Event_Handlers> that are passed in via the
+ // notify pipe before breaking out of its <recv> loop. By default,
+ // this is set to -1, which means "iterate until the pipe is empty."
+
int initialized_;
// True if we've been initialized yet...
ACE_thread_t owner_;
// The original thread that created this Reactor.
+ int state_changed_;
+ // True if state has changed during dispatching of
+ // <ACE_Event_Handlers>, else false. This is used to determine
+ // whether we need to make another trip through the <Reactor>'s
+ // <wait_for_multiple_events> loop.
+
#if defined (ACE_MT_SAFE)
ACE_Reactor_Notify notify_handler_;
// Callback object that unblocks the ACE_Reactor if it's sleeping.
diff --git a/ace/ReactorEx.cpp b/ace/ReactorEx.cpp
index 541ee8b13c2..7b502036df8 100644
--- a/ace/ReactorEx.cpp
+++ b/ace/ReactorEx.cpp
@@ -187,14 +187,14 @@ ACE_ReactorEx_Handler_Repository::bind (ACE_HANDLE handle,
}
int
-ACE_ReactorEx_Handler_Repository::changes_required ()
+ACE_ReactorEx_Handler_Repository::changes_required (void)
{
// Check if handles have be scheduled for additions or removal
return (this->handles_to_be_added_ > 0) || (this->handles_to_be_deleted_ > 0);
}
int
-ACE_ReactorEx_Handler_Repository::make_changes ()
+ACE_ReactorEx_Handler_Repository::make_changes (void)
{
// This method must ONLY be called by the
// <ReactorEx->change_state_thread_>. We therefore assume that there
@@ -211,7 +211,7 @@ ACE_ReactorEx_Handler_Repository::make_changes ()
}
int
-ACE_ReactorEx_Handler_Repository::handle_deletions ()
+ACE_ReactorEx_Handler_Repository::handle_deletions (void)
{
// This will help us in keeping track of the last valid index in the
// handle arrays
@@ -250,7 +250,7 @@ ACE_ReactorEx_Handler_Repository::handle_deletions ()
}
int
-ACE_ReactorEx_Handler_Repository::handle_additions ()
+ACE_ReactorEx_Handler_Repository::handle_additions (void)
{
// Go through the <to_be_added_*> arrays
for (int i = 0; i < this->handles_to_be_added_; i++)
@@ -388,7 +388,7 @@ ACE_ReactorEx::~ACE_ReactorEx (void)
}
void
-ACE_ReactorEx::wakeup_all_threads ()
+ACE_ReactorEx::wakeup_all_threads (void)
{
this->wakeup_all_threads_.signal ();
}
@@ -513,8 +513,160 @@ ACE_ReactorEx::ok_to_wait (ACE_Time_Value *max_wait_time,
return 1;
}
+ACE_HANDLE
+ACE_ReactorEx_Notify::get_handle (void) const
+{
+ return this->wakeup_one_thread_.handle ();
+}
+
+// Handle all pending notifications.
+
+int
+ACE_ReactorEx_Notify::handle_signal (int signum,
+ siginfo_t *siginfo,
+ ucontext_t *)
+{
+ ACE_UNUSED_ARG (signum);
+
+ // Just check for sanity...
+ if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ())
+ return -1;
+
+ for (int i = 1; ; i++)
+ {
+ ACE_Message_Block *mb = 0;
+
+ if (this->message_queue_.dequeue_head
+ (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
+ {
+ if (errno == EWOULDBLOCK)
+ // We've reached the end of the processing, return
+ // normally.
+ return 0;
+ else
+ return -1; // Something weird happened...
+ }
+ else
+ {
+ ACE_Notification_Buffer *buffer =
+ (ACE_Notification_Buffer *) mb->base ();
+
+ // If eh == 0 then we've got major problems! Otherwise, we
+ // need to dispatch the appropriate handle_* method on the
+ // ACE_Event_Handler pointer we've been passed.
+
+ if (buffer->eh_ != 0)
+ {
+ int result = 0;
+
+ switch (buffer->mask_)
+ {
+ case ACE_Event_Handler::READ_MASK:
+ result = buffer->eh_->handle_input (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::WRITE_MASK:
+ result = buffer->eh_->handle_output (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::EXCEPT_MASK:
+ result = buffer->eh_->handle_exception (ACE_INVALID_HANDLE);
+ break;
+ default:
+ ACE_ERROR ((LM_ERROR, "invalid mask = %d\n", buffer->mask_));
+ break;
+ }
+ if (result == -1)
+ buffer->eh_->handle_close (ACE_INVALID_HANDLE,
+ ACE_Event_Handler::EXCEPT_MASK);
+ }
+
+ // Make sure to delete the memory regardless of success or
+ // failure!
+ mb->release ();
+
+ // Bail out if we've reached the <notify_threshold_>. Note
+ // that by default <notify_threshold_> is -1, so we'll loop
+ // until we're done.
+ if (i == this->max_notify_iterations_)
+ break;
+ }
+ }
+}
+
+// Notify the ReactorEx, potentially enqueueing the
+// <ACE_Event_Handler> for subsequent processing in the ReactorEx
+// thread of control.
+
int
-ACE_ReactorEx::update_state ()
+ACE_ReactorEx_Notify::notify (ACE_Event_Handler *eh,
+ ACE_Reactor_Mask mask,
+ ACE_Time_Value *timeout)
+{
+ if (eh != 0)
+ {
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof ACE_Notification_Buffer), -1);
+
+ ACE_Notification_Buffer *buffer =
+ (ACE_Notification_Buffer *) mb->base ();
+ buffer->eh_ = eh;
+ buffer->mask_ = mask;
+
+ // Convert from relative time to absolute time by adding the
+ // current time of day. This is what <ACE_Message_Queue>
+ // expects.
+ if (timeout != 0)
+ *timeout += ACE_OS::gettimeofday ();
+
+ if (this->message_queue_.enqueue_tail
+ (mb, timeout) == -1)
+ {
+ mb->release ();
+ return -1;
+ }
+ }
+
+ return this->wakeup_one_thread_.signal ();
+}
+
+void
+ACE_ReactorEx_Notify::max_notify_iterations (int iterations)
+{
+ ACE_TRACE ("ACE_ReactorEx_Notify::max_notify_iterations");
+ // Must always be > 0 or < 0 to optimize the loop exit condition.
+ if (iterations == 0)
+ iterations = 1;
+
+ this->max_notify_iterations_ = iterations;
+}
+
+int
+ACE_ReactorEx_Notify::max_notify_iterations (void)
+{
+ ACE_TRACE ("ACE_ReactorEx_Notify::max_notify_iterations");
+ return this->max_notify_iterations_;
+}
+
+void
+ACE_ReactorEx::max_notify_iterations (int iterations)
+{
+ ACE_TRACE ("ACE_ReactorEx::max_notify_iterations");
+ ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
+
+ // Must always be > 0 or < 0 to optimize the loop exit condition.
+ this->notify_handler_.max_notify_iterations (iterations);
+}
+
+int
+ACE_ReactorEx::max_notify_iterations (void)
+{
+ ACE_TRACE ("ACE_ReactorEx::max_notify_iterations");
+ ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
+
+ return this->notify_handler_.max_notify_iterations ();
+}
+
+int
+ACE_ReactorEx::update_state (void)
{
// This GUARD is necessary since we are updating shared state.
ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
@@ -684,7 +836,8 @@ ACE_ReactorEx::dispatch_handler (int index)
// ************************************************************
-ACE_ReactorEx_Notify::ACE_ReactorEx_Notify ()
+ACE_ReactorEx_Notify::ACE_ReactorEx_Notify (void)
+ : max_notify_iterations (-1)
{
}
@@ -694,112 +847,4 @@ ACE_ReactorEx_Notify::open (ACE_ReactorEx &reactorEx)
return reactorEx.register_handler (this);
}
-ACE_HANDLE
-ACE_ReactorEx_Notify::get_handle (void) const
-{
- return this->wakeup_one_thread_.handle ();
-}
-
-// Handle all pending notifications.
-
-int
-ACE_ReactorEx_Notify::handle_signal (int signum,
- siginfo_t *siginfo,
- ucontext_t *)
-{
- ACE_UNUSED_ARG (signum);
-
- // Just check for sanity...
- if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ())
- return -1;
-
- for (;;)
- {
- ACE_Message_Block *mb = 0;
-
- if (this->message_queue_.dequeue_head
- (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
- {
- if (errno == EWOULDBLOCK)
- // We've reached the end of the processing, return
- // normally.
- return 0;
- else
- return -1; // Something weird happened...
- }
- else
- {
- ACE_Notification_Buffer *buffer =
- (ACE_Notification_Buffer *) mb->base ();
-
- // If eh == 0 then we've got major problems! Otherwise, we
- // need to dispatch the appropriate handle_* method on the
- // ACE_Event_Handler pointer we've been passed.
-
- if (buffer->eh_ != 0)
- {
- int result = 0;
-
- switch (buffer->mask_)
- {
- case ACE_Event_Handler::READ_MASK:
- result = buffer->eh_->handle_input (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::WRITE_MASK:
- result = buffer->eh_->handle_output (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::EXCEPT_MASK:
- result = buffer->eh_->handle_exception (ACE_INVALID_HANDLE);
- break;
- default:
- ACE_ERROR ((LM_ERROR, "invalid mask = %d\n", buffer->mask_));
- break;
- }
- if (result == -1)
- buffer->eh_->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::EXCEPT_MASK);
- }
- // Make sure to delete the memory regardless of success or
- // failure!
- mb->release ();
- }
- }
-}
-
-// Notify the ReactorEx, potentially enqueueing the
-// <ACE_Event_Handler> for subsequent processing in the ReactorEx
-// thread of control.
-
-int
-ACE_ReactorEx_Notify::notify (ACE_Event_Handler *eh,
- ACE_Reactor_Mask mask,
- ACE_Time_Value *timeout)
-{
- if (eh != 0)
- {
- ACE_Message_Block *mb = 0;
- ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof ACE_Notification_Buffer), -1);
-
- ACE_Notification_Buffer *buffer =
- (ACE_Notification_Buffer *) mb->base ();
- buffer->eh_ = eh;
- buffer->mask_ = mask;
-
- // Convert from relative time to absolute time by adding the
- // current time of day. This is what <ACE_Message_Queue>
- // expects.
- if (timeout != 0)
- *timeout += ACE_OS::gettimeofday ();
-
- if (this->message_queue_.enqueue_tail
- (mb, timeout) == -1)
- {
- mb->release ();
- return -1;
- }
- }
-
- return this->wakeup_one_thread_.signal ();
-}
-
#endif /* ACE_WIN32 */
diff --git a/ace/ReactorEx.h b/ace/ReactorEx.h
index 5c3040c32bf..dd7ce71a135 100644
--- a/ace/ReactorEx.h
+++ b/ace/ReactorEx.h
@@ -86,10 +86,10 @@ public:
// Pointer to the beginning of the current array of
// <ACE_Event_Handler> *'s.
- virtual int changes_required ();
+ virtual int changes_required (void);
// Check if changes to the handle set are required.
- virtual int make_changes ();
+ virtual int make_changes (void);
// Make changes to the handle set
void dump (void) const;
@@ -99,10 +99,10 @@ private:
ACE_ReactorEx &reactorEx_;
// Reference to our <ReactorEx>.
- int handle_deletions ();
+ int handle_deletions (void);
// Add handles to the handle set
- int handle_additions ();
+ int handle_additions (void);
// Remove handles from the handle set
int remove_handler_i (size_t index,
@@ -177,6 +177,24 @@ public:
virtual ACE_HANDLE get_handle (void) const;
// Returns a handle to the <ACE_Auto_Event>.
+ void max_notify_iterations (int);
+ // Set the maximum number of times that the
+ // <ACE_ReactorEx_Notify::handle_input> method will iterate and
+ // dispatch the <ACE_Event_Handlers> that are passed in via the
+ // notify queue before breaking out of its
+ // <ACE_Message_Queue::dequeue> loop. By default, this is set to
+ // -1, which means "iterate until the queue is empty." Setting this
+ // to a value like "1 or 2" will increase "fairness" (and thus
+ // prevent starvation) at the expense of slightly higher dispatching
+ // overhead.
+
+ int max_notify_iterations (void);
+ // Get the maximum number of times that the
+ // <ACE_ReactorEx_Notify::handle_input> method will iterate and
+ // dispatch the <ACE_Event_Handlers> that are passed in via the
+ // notify queue before breaking out of its
+ // <ACE_Message_Queue::dequeue> loop.
+
private:
virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
// Called when the notification event waited on by <ACE_ReactorEx>
@@ -193,6 +211,14 @@ private:
// Message queue that keeps track of pending <ACE_Event_Handlers>.
// This queue must be thread-safe because it can be called by
// multiple threads of control.
+
+ int max_notify_iterations_;
+ // Keeps track of the maximum number of times that the
+ // <ACE_ReactorEx_Notify::handle_input> method will iterate and
+ // dispatch the <ACE_Event_Handlers> that are passed in via the
+ // notify queue before breaking out of its
+ // <ACE_Message_Queue::dequeue> loop. By default, this is set to
+ // -1, which means "iterate until the queue is empty."
};
#if defined (ACE_WIN32)
@@ -345,6 +371,24 @@ public:
// 0, the caller will block until action is possible, else will wait
// until the relative time specified in <timeout> elapses).
+ void max_notify_iterations (int);
+ // Set the maximum number of times that the
+ // <ACE_ReactorEx_Notify::handle_input> method will iterate and
+ // dispatch the <ACE_Event_Handlers> that are passed in via the
+ // notify queue before breaking out of its
+ // <ACE_Message_Queue::dequeue> loop. By default, this is set to
+ // -1, which means "iterate until the queue is empty." Setting this
+ // to a value like "1 or 2" will increase "fairness" (and thus
+ // prevent starvation) at the expense of slightly higher dispatching
+ // overhead.
+
+ int max_notify_iterations (void);
+ // Get the maximum number of times that the
+ // <ACE_ReactorEx_Notify::handle_input> method will iterate and
+ // dispatch the <ACE_Event_Handlers> that are passed in via the
+ // notify queue before breaking out of its
+ // <ACE_Message_Queue::dequeue> loop.
+
void dump (void) const;
// Dump the state of an object.
@@ -360,7 +404,7 @@ public:
protected:
virtual int ok_to_wait (ACE_Time_Value *max_wait_time,
int alertable);
- // Check to see if it is ok to enter ::WaitForMultipleObjects()
+ // Check to see if it is ok to enter ::WaitForMultipleObjects().
virtual int wait_for_multiple_events (ACE_Time_Value *max_wait_time,
int alertable);
@@ -386,10 +430,10 @@ private:
int calculate_timeout (ACE_Time_Value *time);
// Used to caluculate the next timeout
- int update_state ();
+ int update_state (void);
// Update the state of the handler repository
- void wakeup_all_threads ();
+ void wakeup_all_threads (void);
// Wake up all threads in WaitForMultipleObjects so that they can
// reconsult the handle set
diff --git a/ace/SOCK_Stream.h b/ace/SOCK_Stream.h
index 4111c01d12e..aa9bd905047 100644
--- a/ace/SOCK_Stream.h
+++ b/ace/SOCK_Stream.h
@@ -41,7 +41,9 @@ public:
ssize_t recv_n (void *buf, int n, int flags) const;
// Recv n bytes, keep trying until n are received.
- ssize_t send_n (const void *buf, size_t len, int flags,
+ ssize_t send_n (const void *buf,
+ size_t len,
+ int flags,
const ACE_Time_Value *timeout);
// Try to send exactly <len> bytes into <buf> from <handle> (uses
// the <send> call). If <send> blocks for longer than <timeout> the
@@ -49,12 +51,19 @@ public:
// If a timeout does not occur, <send_n> return <len> (i.e., the
// number of bytes requested to be sent).
- ssize_t recv_n (void *buf, size_t len, int flags,
+ ssize_t recv_n (void *buf,
+ size_t len,
+ int flags,
const ACE_Time_Value *timeout);
- // Wait up to <timeout> amount of time to receive up to <len> bytes
- // into <buf> from <handle> (uses the <recv> call). If <recv> times
- // out a -1 is returned with <errno == ETIME>. If it succeeds the
- // number of bytes received is returned.
+ // Try to recv exactly <len> bytes into <buf> from <handle> (uses
+ // the <ACE::recv_n> call). The <ACE_Time_Value> indicates how long
+ // to blocking trying to receive. If <timeout> == 0, the caller
+ // will block until action is possible, else will wait until the
+ // relative time specified in *<timeout> elapses). If <recv> blocks
+ // for longer than <timeout> the number of bytes actually read is
+ // returned with <errno == ETIME>. If a timeout does not occur,
+ // <recv_n> return <len> (i.e., the number of bytes requested to be
+ // read).
// = Send/receive an ``urgent'' character (see TCP specs...).
ssize_t send_urg (void *ptr, int len = sizeof (char));
@@ -69,7 +78,7 @@ public:
int close (void);
// Close down the socket (we need this to make things work correctly
// on Win32, which requires use to do a <close_writer> before doing
- // the close in order to avoid losing data).
+ // the close to avoid losing data).
// = Meta-type info
typedef ACE_INET_Addr PEER_ADDR;
diff --git a/ace/SOCK_Stream.i b/ace/SOCK_Stream.i
index 6bf38fbaf26..d916914da4a 100644
--- a/ace/SOCK_Stream.i
+++ b/ace/SOCK_Stream.i
@@ -35,7 +35,7 @@ inline ssize_t
ACE_SOCK_Stream::recv_n (void *buf, size_t len, int flags,
const ACE_Time_Value *timeout)
{
- ACE_TRACE ("ACE_SOCK_Stream::send_n");
+ ACE_TRACE ("ACE_SOCK_Stream::recv_n");
return ACE::recv_n (this->get_handle (), buf, len, flags, timeout);
}
diff --git a/ace/TTY_IO.cpp b/ace/TTY_IO.cpp
index b0a2f3ec20b..c2cb94dcfcb 100644
--- a/ace/TTY_IO.cpp
+++ b/ace/TTY_IO.cpp
@@ -193,7 +193,13 @@ ACE_TTY_IO::control (Control_Mode cmd,
dcb.fRtsControl = RTS_CONTROL_DISABLE ;
}
dcb.fBinary = TRUE ;
- return ::SetCommState (this->get_handle (), &dcb);
+ ::SetCommState (this->get_handle (), &dcb);
+
+ // 2/13/97 BWF added drop out timer
+ COMMTIMEOUTS timeouts;
+ ::GetCommTimeouts (this->get_handle(), &timeouts) ;
+ timeouts.ReadIntervalTimeout = arg->readtimeoutmsec ;
+ return ::SetCommTimeouts (this->get_handle (), &timeouts) ;
case GETPARAMS:
ACE_NOTSUP_RETURN (-1); // Not yet implemented.
diff --git a/apps/Gateway/Gateway/proxy_config b/apps/Gateway/Gateway/proxy_config
index fa4fcb13a91..ae3a26a7f07 100644
--- a/apps/Gateway/Gateway/proxy_config
+++ b/apps/Gateway/Gateway/proxy_config
@@ -34,8 +34,8 @@
# Proxy Host Remote Proxy Max Retry Local Priority
# ID Port Role Timeout Port
# ---- -------- ------ ------ ---------- ----- --------
- 1 tango.cs 10003 S 32 0 1
- 2 tango.cs 10002 C 32 0 1
+ 1 tango.cs 10011 S 32 0 1
+ 2 tango.cs 10010 C 32 0 1
# 3 mambo.cs 10002 C 32 0 1
# 4 lambada.cs 10002 C 32 0 1
# 5 lambada.cs 10002 C 32 0 1
diff --git a/apps/Gateway/Gateway/svc.conf b/apps/Gateway/Gateway/svc.conf
index c822713287b..9ec4dc429dc 100644
--- a/apps/Gateway/Gateway/svc.conf
+++ b/apps/Gateway/Gateway/svc.conf
@@ -1,3 +1,3 @@
#static Svc_Manager "-d -p 2913"
-dynamic Gateway Service_Object * ./Gateway:_make_Gateway() active "-d -c -P proxy_config -C consumer_config"
+dynamic Gateway Service_Object * ./Gateway:_make_Gateway() active "-d -c -P proxy_config -C consumer_config -tOUTPUT_MT"
diff --git a/examples/Reactor/Misc/test_signals_1.cpp b/examples/Reactor/Misc/test_signals_1.cpp
index 710c07a3eff..fcb7ba8d05f 100644
--- a/examples/Reactor/Misc/test_signals_1.cpp
+++ b/examples/Reactor/Misc/test_signals_1.cpp
@@ -1,5 +1,13 @@
+// This simple program illustrates the difference between handling
+// signals via the Reactor (which doesn't cause the event loop to
+// terminate) and signals that aren't handled via the Reactor (which
+// do).
+
#include "ace/Service_Config.h"
+// Number of times to allow signal to execute until we quit.
+static size_t count = 10;
+
static void
my_signal_function (int sig)
{
@@ -13,7 +21,14 @@ public:
siginfo_t *,
ucontext_t *)
{
- ACE_DEBUG ((LM_DEBUG, "Executed ACE signal handler for sig %S\n", sig));
+ ACE_DEBUG ((LM_DEBUG, "Executed ACE signal handler for sig %S, count = %d\n",
+ sig, count));
+
+ count--;
+
+ if (count == 0)
+ ACE_Service_Config::end_reactor_event_loop ();
+
return 0;
}
@@ -54,10 +69,10 @@ main (int, char *argv[])
// This just executes the reactor events until my_handler tells us
// we are finished.
- ACE_DEBUG ((LM_DEBUG, "starting event loop\n"));
+ ACE_DEBUG ((LM_DEBUG, "starting event loop that runs until you've typed ^C a total of 10 times or ^\\ once.\n"));
- int result = my_config.run_reactor_event_loop ();
+ while (my_config.reactor_event_loop_done () == 0)
+ my_config.run_reactor_event_loop ();
- ACE_DEBUG ((LM_DEBUG, "result = %d\n", result));
return 0;
}
diff --git a/examples/Reactor/ReactorEx/test_MT.cpp b/examples/Reactor/ReactorEx/test_MT.cpp
index 6a7975c67dc..f3c910705bc 100644
--- a/examples/Reactor/ReactorEx/test_MT.cpp
+++ b/examples/Reactor/ReactorEx/test_MT.cpp
@@ -66,8 +66,8 @@ parse_args (int argc, char **argv)
class Task_Handler : public ACE_Task<ACE_NULL_SYNCH>
{
public:
- Task_Handler (int number_of_handles,
- int concurrent_threads);
+ Task_Handler (size_t number_of_handles,
+ size_t concurrent_threads);
// Constructor.
~Task_Handler (void);
@@ -79,7 +79,7 @@ public:
int svc (void);
// Task event loop.
- int signal (int index);
+ int signal (size_t index);
// Signal an event.
private:
@@ -96,11 +96,12 @@ Task_Handler::svc (void)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "handle_events"), 0);
}
-Task_Handler::Task_Handler (int number_of_handles,
- int concurrent_threads)
+Task_Handler::Task_Handler (size_t number_of_handles,
+ size_t int concurrent_threads)
{
ACE_NEW (this->events_, ACE_Auto_Event [number_of_handles]);
- for (int i = 0; i < number_of_handles; i++)
+
+ for (size_t i = 0; i < number_of_handles; i++)
{
if (ACE_Service_Config::reactorEx ()->register_handler (this,
this->events_[i].handle ()) == -1)
@@ -144,7 +145,7 @@ Task_Handler::handle_signal (int signum, siginfo_t *siginfo, ucontext_t *)
}
int
-Task_Handler::signal (int index)
+Task_Handler::signal (size_T index)
{
return this->events_[index].signal ();
}
diff --git a/examples/Reactor/WFMO_Reactor/test_MT.cpp b/examples/Reactor/WFMO_Reactor/test_MT.cpp
index 6a7975c67dc..f3c910705bc 100644
--- a/examples/Reactor/WFMO_Reactor/test_MT.cpp
+++ b/examples/Reactor/WFMO_Reactor/test_MT.cpp
@@ -66,8 +66,8 @@ parse_args (int argc, char **argv)
class Task_Handler : public ACE_Task<ACE_NULL_SYNCH>
{
public:
- Task_Handler (int number_of_handles,
- int concurrent_threads);
+ Task_Handler (size_t number_of_handles,
+ size_t concurrent_threads);
// Constructor.
~Task_Handler (void);
@@ -79,7 +79,7 @@ public:
int svc (void);
// Task event loop.
- int signal (int index);
+ int signal (size_t index);
// Signal an event.
private:
@@ -96,11 +96,12 @@ Task_Handler::svc (void)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "handle_events"), 0);
}
-Task_Handler::Task_Handler (int number_of_handles,
- int concurrent_threads)
+Task_Handler::Task_Handler (size_t number_of_handles,
+ size_t int concurrent_threads)
{
ACE_NEW (this->events_, ACE_Auto_Event [number_of_handles]);
- for (int i = 0; i < number_of_handles; i++)
+
+ for (size_t i = 0; i < number_of_handles; i++)
{
if (ACE_Service_Config::reactorEx ()->register_handler (this,
this->events_[i].handle ()) == -1)
@@ -144,7 +145,7 @@ Task_Handler::handle_signal (int signum, siginfo_t *siginfo, ucontext_t *)
}
int
-Task_Handler::signal (int index)
+Task_Handler::signal (size_T index)
{
return this->events_[index].signal ();
}
diff --git a/tests/Future_Test.cpp b/tests/Future_Test.cpp
index 1195afb287d..c971d31df8f 100644
--- a/tests/Future_Test.cpp
+++ b/tests/Future_Test.cpp
@@ -284,7 +284,6 @@ Scheduler::work (u_long newparam, int newcount)
static int n_loops = 100;
#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
-template class ACE_Atomic_Op<ACE_Thread_Mutex, int>;
template class ACE_Future<const char *>;
template class ACE_Future<u_long>;
#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
diff --git a/tests/Makefile b/tests/Makefile
index e1e500bb8f6..4b1cbba8335 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -26,6 +26,7 @@ BIN = Barrier_Test \
Pipe_Test \
Reactors_Test \
Reactor_Exceptions_Test \
+ Reactor_Notify_Test \
Reactor_Timer_Test \
Reader_Writer_Test \
Recursive_Mutex_Test \
diff --git a/tests/Reactor_Notify_Test.cpp b/tests/Reactor_Notify_Test.cpp
new file mode 100644
index 00000000000..89bf1629801
--- /dev/null
+++ b/tests/Reactor_Notify_Test.cpp
@@ -0,0 +1,204 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// tests
+//
+// = FILENAME
+// Reactors_Test.cpp
+//
+// = DESCRIPTION
+// This is a test that illustrates how the Reactor's notify()
+// method works under various settings of max_notify_iterations().
+//
+// = AUTHOR
+// Douglas C. Schmidt
+//
+// ============================================================================
+
+#include "ace/Synch.h"
+#include "ace/Service_Config.h"
+#include "ace/Task.h"
+#include "test_config.h"
+
+#if defined (ACE_HAS_THREADS)
+
+class Supplier_Task : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ Supplier_Task (void);
+ // Constructor.
+
+ ~Supplier_Task (void);
+ // Destructor.
+
+ virtual int open (void * = 0);
+ // Make this an Active Object.
+
+ virtual int close (u_long);
+ // Close down the supplier.
+
+ virtual int svc (void);
+ // Generates events and sends them to the <Reactor>'s <notify>
+ // method.
+
+ virtual int handle_exception (ACE_HANDLE);
+ // Releases the <waiter_> semaphore when called by the <Reactor>'s
+ // notify handler.
+
+ virtual int handle_output (ACE_HANDLE);
+ // Called every time through the main Reactor event loop to
+ // illustrate the difference between "limited" and "unlimited"
+ // notification.
+
+private:
+ ACE_Thread_Semaphore waiter_;
+ // Used to hand-shake between the <Supplier_Task> and the
+ // <Reactor>'s notify mechanism.
+
+ ACE_Pipe pipe_;
+ // We use this pipe just so we can get a handle that is always
+ // "active."
+};
+
+Supplier_Task::Supplier_Task (void)
+ : waiter_ (0) // Make semaphore "locked" by default.
+{
+}
+
+int
+Supplier_Task::open (void *)
+{
+ if (this->pipe_.open () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "open failed"), -1);
+ else if (ACE_Service_Config::reactor ()->register_handler
+ (this->pipe_.write_handle (), this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler failed"), -1);
+ else
+ // Make this an Active Object.
+ return this->activate (THR_BOUND);
+}
+
+int
+Supplier_Task::close (u_long)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) Supplier_Task::close\n"));
+
+ if (ACE_Service_Config::reactor ()->remove_handler
+ (this->pipe_.write_handle (), ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "remove_handler failed"));
+ return 0;
+}
+
+Supplier_Task::~Supplier_Task (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) ~Supplier_Task\n"));
+}
+
+int
+Supplier_Task::svc (void)
+{
+ // ACE_NEW_THREAD;
+
+ size_t i;
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) **** starting unlimited notifications test\n"));
+
+ // Allow an unlimited number of iterations per
+ // <ACE_Reactor::notify>.
+ ACE_Service_Config::reactor ()->max_notify_iterations (-1);
+
+ for (i = 0; i < ACE_MAX_ITERATIONS; i++)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) notifying reactor\n"));
+ // Notify the Reactor.
+ if (ACE_Service_Config::reactor ()->notify (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "notify"), -1);
+
+ // Wait for our <handle_exception> method to release the
+ // semaphore.
+ else if (this->waiter_.acquire () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "acquire"), -1);
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) **** starting limited notifications test\n"));
+
+ // Only allow 1 iteration per <ACE_Reactor::notify>
+ ACE_Service_Config::reactor ()->max_notify_iterations (1);
+
+ for (i = 0; i < ACE_MAX_ITERATIONS; i++)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) notifying reactor\n"));
+ // Notify the Reactor.
+ if (ACE_Service_Config::reactor ()->notify (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "notify"), -1);
+
+ // Wait for our <handle_exception> method to release the
+ // semaphore.
+ else if (this->waiter_.acquire () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "acquire"), -1);
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) **** exiting thread test\n"));
+ return 0;
+}
+
+int
+Supplier_Task::handle_exception (ACE_HANDLE handle)
+{
+ ACE_ASSERT (handle == ACE_INVALID_HANDLE);
+ ACE_DEBUG ((LM_DEBUG, "(%t) handle_exception\n"));
+
+ this->waiter_.release ();
+ return 0;
+}
+
+int
+Supplier_Task::handle_output (ACE_HANDLE handle)
+{
+ ACE_ASSERT (handle == this->pipe_.write_handle ());
+ ACE_DEBUG ((LM_DEBUG, "(%t) handle_output\n"));
+ return 0;
+}
+
+#endif /* ACE_HAS_THREADS */
+
+int
+main (int, char *[])
+{
+ // ACE_START_TEST ("Reactors_Test");
+
+#if defined (ACE_HAS_THREADS)
+ ACE_Service_Config daemon;
+ ACE_ASSERT (ACE_LOG_MSG->op_status () != -1);
+
+ Supplier_Task task;
+ ACE_ASSERT (ACE_LOG_MSG->op_status () != -1);
+
+ if (task.open () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) open failed\n"), -1);
+ else
+ {
+ ACE_Time_Value timeout (2);
+
+ for (;;)
+ {
+ // Use a timeout to inform the Reactor when to shutdown.
+ switch (ACE_Service_Config::reactor ()->handle_events (timeout))
+ {
+ case -1:
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "reactor"), -1);
+ /* NOTREACHED */
+ case 0:
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) timeout\n"), 0);
+ /* NOTREACHED */
+ }
+ }
+ }
+#else
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+#endif /* ACE_HAS_THREADS */
+ // ACE_END_TEST;
+ return 0;
+}
diff --git a/tests/Reactors_Test.cpp b/tests/Reactors_Test.cpp
index 5a30488f5cd..18ad9f3e4d7 100644
--- a/tests/Reactors_Test.cpp
+++ b/tests/Reactors_Test.cpp
@@ -17,7 +17,6 @@
//
// ============================================================================
-#include "ace/Reactor.h"
#include "ace/Synch.h"
#include "ace/Service_Config.h"
#include "ace/Task.h"
@@ -25,7 +24,6 @@
#if defined (ACE_HAS_THREADS)
-static const int NUM_INVOCATIONS = 10;
static const int MAX_TASKS = 20;
class Test_Task : public ACE_Task<ACE_MT_SYNCH>
@@ -100,7 +98,7 @@ Test_Task::svc (void)
{
ACE_NEW_THREAD;
- for (int i = 0; i < NUM_INVOCATIONS; i++)
+ for (int i = 0; i < ACE_MAX_ITERATIONS; i++)
{
ACE_OS::thr_yield ();
@@ -130,7 +128,7 @@ Test_Task::handle_input (ACE_HANDLE)
{
this->handled_++;
- if (this->handled_ == NUM_INVOCATIONS)
+ if (this->handled_ == ACE_MAX_ITERATIONS)
{
done_count--;
ACE_DEBUG ((LM_DEBUG,
@@ -158,6 +156,7 @@ worker (void *args)
for (;;)
{
+ // Use a timeout to inform the Reactor when to shutdown.
switch (reactor->handle_events (timeout))
{
case -1:
@@ -172,10 +171,6 @@ worker (void *args)
return 0;
}
-#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
-template class ACE_Atomic_Op<ACE_Thread_Mutex, int>;
-#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
-
#endif /* ACE_HAS_THREADS */
int
diff --git a/tests/Reader_Writer_Test.cpp b/tests/Reader_Writer_Test.cpp
index cdd70035634..74c2a154c24 100644
--- a/tests/Reader_Writer_Test.cpp
+++ b/tests/Reader_Writer_Test.cpp
@@ -144,10 +144,6 @@ writer (void *)
return 0;
}
-#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
-template class ACE_Atomic_Op<ACE_Thread_Mutex, int>;
-#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
-
#endif /* ACE_HAS_THREADS */
// Spawn off threads.
diff --git a/tests/run_tests.sh b/tests/run_tests.sh
index 08c90d9a463..135d7058855 100755
--- a/tests/run_tests.sh
+++ b/tests/run_tests.sh
@@ -60,6 +60,7 @@ run Thread_Pool_Test # uses Thread_Manager, Task
run Future_Test # uses Thread_Manager, Task
run Reactors_Test # uses Task, Mutex, Reactor
run Reactor_Exceptions_Test # uses Reactor and C++ exceptions
+run Reactor_Notify_Test # uses Reactor's notify() method, Task
run Reactor_Timer_Test # uses Event_Handler, Reactor
run Reader_Writer_Test # uses Thread_Manager, Mutex
run SOCK_Test # uses Thread_Manager, SOCK_SAP