summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1999-09-01 05:27:46 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1999-09-01 05:27:46 +0000
commit7c9df2b94260de0683498f181e8550f6382ad702 (patch)
tree3ecd37f7db1de5869eecacf0074a491ac5200705
parent026bcd6b9b76835c8cdc1d4d32fafc25f5fd85ab (diff)
downloadATCD-7c9df2b94260de0683498f181e8550f6382ad702.tar.gz
ChangeLogTag:Wed Sep 1 00:05:04 1999 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
-rw-r--r--ChangeLog-99b64
-rw-r--r--ace/Containers_T.cpp2
-rw-r--r--ace/Containers_T.h3
-rw-r--r--ace/Event_Handler.h2
-rw-r--r--ace/OS.h4
-rw-r--r--ace/Process_Manager.cpp1
-rw-r--r--ace/Select_Reactor_Base.cpp164
-rw-r--r--ace/Select_Reactor_Base.h15
-rw-r--r--tests/Reactor_Notify_Test.cpp314
9 files changed, 434 insertions, 135 deletions
diff --git a/ChangeLog-99b b/ChangeLog-99b
index 68655b36a09..d6a3906880c 100644
--- a/ChangeLog-99b
+++ b/ChangeLog-99b
@@ -1,19 +1,41 @@
+Wed Sep 1 00:05:04 1999 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
-Tue Aug 31 20:10:49 1999 Yamuna Krishnamurthy <yamuna@cs.wustl.edu>
+ * tests/Reactor_Notify_Test.cpp: Modified this test to exercise
+ the new user-level notification queueing in the ACE_Reactor.
+
+Tue Aug 31 16:05:14 1999 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
- * ace/OS.h:
- Added the ACE_HAS_NO_THROW_SPEC check in order to suppress the
- generation of throw specs to placate some compilers
- Thank to Andreas Geisler <Andreas.Geisler@erls.siemens.de> for
- suggesting this addition
+ * ace/Process_Manager.cpp (handle_close): Added an ACE_UNUSED_ARG
+ to keep the compiler from warning that the "handle" parameter
+ isn't used.
+ * ace/OS.h: Added a new #defined called
+ ACE_REACTOR_NOTIFICATION_ARRAY_SIZE that designates the size of
+ the array of ACE_Notification_Buffers used by the new Reactor
+ user-level notification queue feature.
+
+ * ace/Select_Reactor_Base: Added support for a user-level
+ notification queue that can buffer very large amounts of pending
+ notifications, i.e., well beyond the 64 kbyte limit imposed by
+ the use of a socket. This feature is only enabled if
+ ACE_HAS_REACTOR_NOTIFICATION_QUEUE is enabled in a config.h
+ file. Thanks to Detlef Becker <Detlef.Becker@med.siemens.de>
+ for suggesting this and contributing a prototype implementation.
+
+Tue Aug 31 20:10:49 1999 Yamuna Krishnamurthy <yamuna@cs.wustl.edu>
+
+ * ace/OS.h: Added the ACE_HAS_NO_THROW_SPEC check in order to
+ suppress the generation of throw specs to placate some compilers
+ and make it easier for CORBA developers to by lazy wrt C++
+ exception throw specs ;-). Thank to Andreas Geisler
+ <Andreas.Geisler@erls.siemens.de> for suggesting this addition
1999-08-31 Vishal Kachroo <vishal@cs.wustl.edu>
- Modified the test to use a single handler registered for
- QOS_MASK|READ_MASK instead of two separate handlers. Had to
- remove the following files and add other files.
-
+ Modified the ACE GQoS test to use a single handler registered
+ for QOS_MASK|READ_MASK instead of two separate handlers. Had to
+ remove the following files and add other files.
+
* ACE_wrappers/examples/QOS/QOS_Event_Handler.cpp
* ACE_wrappers/examples/QOS/Read_Handler.h
* ACE_wrappers/examples/QOS/QOS_Event_Handler.h
@@ -23,30 +45,30 @@ Tue Aug 31 20:10:49 1999 Yamuna Krishnamurthy <yamuna@cs.wustl.edu>
The Sender event handler is only registered for QOS events. It
uses the ACE_OS::sendto () as soon as it receives a QOS event.
-
+
* ACE_wrappers/examples/QOS/Sender_QOS_Event_Handler.cpp
* ACE_wrappers/examples/QOS/Sender_QOS_Event_Handler.h
The Receiver event handler is registered for both QOS as well as
- READ events since it has to initially wait for a PATH RSVP message
- from the sender (QOS event, finally!!) and then read from the
- same QOS enabled socket.
-
+ READ events since it has to initially wait for a PATH RSVP
+ message from the sender (QOS event, finally!!) and then read
+ from the same QOS enabled socket.
+
* ACE_wrappers/examples/QOS/Receiver_QOS_Event_Handler.cpp
* ACE_wrappers/examples/QOS/Receiver_QOS_Event_Handler.cpp
The client and server have been modified to subscribe to the
- same multicast session. The RSVP SP on the receiver side sends
- RESV messages at the earliest indication of QOS parameters AND
- existence of a matching PATH state. This state is found only if
- a multicast socket has been created (by the sender) with a
- matching "multicast session" address.
+ same multicast session. The RSVP SP on the receiver side sends
+ RESV messages at the earliest indication of QOS parameters AND
+ existence of a matching PATH state. This state is found only if
+ a multicast socket has been created (by the sender) with a
+ matching "multicast session" address.
* ACE_wrappers/examples/QOS/server.cpp:
* ACE_wrappers/examples/QOS/client.cpp:
Modified the following to incorporate the removal and addition
- of files listed above.
+ of files listed above.
* ACE_wrappers/examples/QOS/QOS.dsw
* ACE_wrappers/examples/QOS/client.dsp
diff --git a/ace/Containers_T.cpp b/ace/Containers_T.cpp
index 9956bdcdb52..c21860c5ff1 100644
--- a/ace/Containers_T.cpp
+++ b/ace/Containers_T.cpp
@@ -416,7 +416,7 @@ ACE_Unbounded_Queue<T>::dump (void) const
for (ACE_Unbounded_Queue_Iterator<T> iter (*(ACE_Unbounded_Queue<T> *) this);
iter.next (item) != 0;
iter.advance ())
- ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("count = %d\n"), count++));
+ ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("count = %d\n"), count++));
ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
}
diff --git a/ace/Containers_T.h b/ace/Containers_T.h
index 410c1d23306..222d3a4dc34 100644
--- a/ace/Containers_T.h
+++ b/ace/Containers_T.h
@@ -475,7 +475,8 @@ public:
// = Additional utility methods.
void reset (void);
- // Reset the <ACE_Unbounded_Queue> to be empty.
+ // Reset the <ACE_Unbounded_Queue> to be empty and release all its
+ // dynamically allocated resources.
int get (T *&item, size_t slot = 0) const;
// Get the <slot>th element in the set. Returns -1 if the element
diff --git a/ace/Event_Handler.h b/ace/Event_Handler.h
index 3ed97652830..1f33a298f36 100644
--- a/ace/Event_Handler.h
+++ b/ace/Event_Handler.h
@@ -10,7 +10,7 @@
// Event_Handler.h
//
// = AUTHOR
-// Doug Schmidt
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
//
// ============================================================================
diff --git a/ace/OS.h b/ace/OS.h
index 51ed6162553..cbeae015024 100644
--- a/ace/OS.h
+++ b/ace/OS.h
@@ -56,6 +56,10 @@ enum ACE_Recyclable_State
#define ACE_DEFAULT_SERVICE_REPOSITORY_SIZE 1024
#endif /* ACE_DEFAULT_SERVICE_REPOSITORY_SIZE */
+#if !defined (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE)
+#define ACE_REACTOR_NOTIFICATION_ARRAY_SIZE 1024
+#endif /* ACE_REACTOR_NOTIFICATION_ARRAY_SIZE */
+
// Do not change these values wantonly since GPERF depends on them..
#define ACE_ASCII_SIZE 128
#define ACE_EBCDIC_SIZE 256
diff --git a/ace/Process_Manager.cpp b/ace/Process_Manager.cpp
index 961d9b25345..111c2d19ef8 100644
--- a/ace/Process_Manager.cpp
+++ b/ace/Process_Manager.cpp
@@ -340,6 +340,7 @@ ACE_Process_Manager::handle_close (ACE_HANDLE handle,
ACE_Reactor_Mask)
{
ACE_TRACE ("ACE_Process_Manager::handle_close");
+ ACE_UNUSED_ARG (handle);
ACE_ASSERT (handle == this->dummy_handle_);
diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp
index 670de5d78b9..017262e78ac 100644
--- a/ace/Select_Reactor_Base.cpp
+++ b/ace/Select_Reactor_Base.cpp
@@ -517,6 +517,22 @@ ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
if (this->notification_pipe_.open () == -1)
return -1;
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ ACE_Notification_Buffer *temp;
+
+ ACE_NEW_RETURN (temp,
+ ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
+ -1);
+
+ if (this->alloc_set_.enqueue_head (temp) == -1)
+ return -1;
+
+ for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; i++)
+ if (free_set_.enqueue_head (temp + i) == -1)
+ return -1;
+
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+
// There seems to be a Win32 bug with this... Set this into
// non-blocking mode.
if (ACE::set_flags (this->notification_pipe_.read_handle (),
@@ -539,6 +555,21 @@ int
ACE_Select_Reactor_Notify::close (void)
{
ACE_TRACE ("ACE_Select_Reactor_Notify::close");
+
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ // Free up the dynamically allocated resources.
+ ACE_Notification_Buffer **b;
+
+ for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_set_);
+ alloc_iter.next (b) != 0;
+ alloc_iter.advance ())
+ delete [] *b;
+
+ this->alloc_set_.reset ();
+ this->notify_set_.reset ();
+ this->free_set_.reset ();
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+
return this->notification_pipe_.close ();
}
@@ -555,6 +586,58 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
return 0;
else
{
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ ACE_Notification_Buffer buffer (eh, mask);
+ int notification_required = 0;
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ // No pending notifications.
+ if (this->notify_set_.is_empty ())
+ notification_required = 1;
+
+ ACE_Notification_Buffer *temp = 0;
+
+ if (free_set_.dequeue_head (temp) == -1)
+ {
+ // Grow the queue of available buffers.
+ ACE_Notification_Buffer *temp1;
+
+ ACE_NEW_RETURN (temp1,
+ ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
+ -1);
+
+ if (this->alloc_set_.enqueue_head (temp1) == -1)
+ return -1;
+
+ // Start at 1 and enqueue only
+ // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
+ // the first one will be used right now.
+ for (size_t i = 1;
+ i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
+ i++)
+ this->free_set_.enqueue_head (temp1 + i);
+
+ temp = temp1;
+ }
+
+ ACE_ASSERT (temp != 0);
+ *temp = buffer;
+
+ if (notify_set_.enqueue_tail (temp) == -1)
+ return -1;
+
+ if (notification_required)
+ {
+ ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
+ (char *) &buffer,
+ sizeof buffer,
+ timeout);
+ if (n == -1)
+ return -1;
+ }
+ return 0;
+#else
ACE_Notification_Buffer buffer (eh, mask);
ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
@@ -565,6 +648,7 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
return -1;
else
return 0;
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
}
}
@@ -624,11 +708,68 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
return -1;
}
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ // Dispatch all messages that are in the <notify_set_>.
+ for (;;)
+ {
+ {
+ // We acquire the lock in a block to make sure we're not
+ // holding the lock while delivering callbacks...
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ ACE_Notification_Buffer *temp;
+
+ if (notify_set_.is_empty ())
+ break;
+ else if (notify_set_.dequeue_head (temp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ASYS_TEXT ("%p\n"),
+ ASYS_TEXT ("dequeue_head")),
+ -1);
+ buffer = *temp;
+ if (free_set_.enqueue_head (temp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ASYS_TEXT ("%p\n"),
+ ASYS_TEXT ("enqueue_head")),
+ -1);
+ }
+
+ // If eh == 0 then another thread is unblocking the
+ // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
+ // internal structures. Otherwise, we need to dispatch the
+ // appropriate handle_* method on the <ACE_Event_Handler>
+ // pointer we've been passed.
+ if (buffer.eh_ != 0)
+ {
+ int result = 0;
+
+ switch (buffer.mask_)
+ {
+ case ACE_Event_Handler::READ_MASK:
+ case ACE_Event_Handler::ACCEPT_MASK:
+ result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::WRITE_MASK:
+ result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::EXCEPT_MASK:
+ result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
+ break;
+ default:
+ // Should we bail out if we get an invalid mask?
+ ACE_ERROR ((LM_ERROR, ASYS_TEXT ("invalid mask = %d\n"), buffer.mask_));
+ }
+ if (result == -1)
+ buffer.eh_->handle_close (ACE_INVALID_HANDLE,
+ ACE_Event_Handler::EXCEPT_MASK);
+ }
+ }
+#else
// If eh == 0 then another thread is unblocking the
- // ACE_Select_Reactor to update the ACE_Select_Reactor's
+ // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
// internal structures. Otherwise, we need to dispatch the
- // appropriate handle_* method on the ACE_Event_Handler pointer
- // we've been passed.
+ // appropriate handle_* method on the <ACE_Event_Handler>
+ // pointer we've been passed.
if (buffer.eh_ != 0)
{
int result = 0;
@@ -662,6 +803,7 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
ACE_Event_Handler::EXCEPT_MASK);
}
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
number_dispatched++;
// Bail out if we've reached the <notify_threshold_>. Note that
@@ -679,7 +821,7 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
// Enqueue ourselves into the list of waiting threads. When we
// reacquire the token we'll be off and running again with ownership
// of the token. The postcondition of this call is that
- // this->select_reactor_.token_.current_owner () == ACE_Thread::self ();
+ // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
this->select_reactor_->renew ();
return number_dispatched;
}
@@ -781,3 +923,17 @@ ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle,
}
return omask;
}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+template class ACE_Unbounded_Queue <ACE_Notification_Buffer *>;
+template class ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>;
+template class ACE_Node <ACE_Notification_Buffer *>;
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+#pragma instantiate ACE_Unbounded_Queue <ACE_Notification_Buffer *>
+#pragma instantiate ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>
+#pragma instantiate ACE_Node <ACE_Notification_Buffer *>
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/ace/Select_Reactor_Base.h b/ace/Select_Reactor_Base.h
index 10e333cb144..45e87f3be84 100644
--- a/ace/Select_Reactor_Base.h
+++ b/ace/Select_Reactor_Base.h
@@ -169,6 +169,21 @@ private:
// dispatch the <ACE_Event_Handlers> that are passed in via the
// notify pipe before breaking out of its <recv> loop. By default,
// this is set to -1, which means "iterate until the pipe is empty."
+
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_set_;
+ // Keeps track of allocated arrays of type
+ // <ACE_Notification_Buffer>.
+
+ ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_set_;
+ // Keeps track of all pending notifications.
+
+ ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_set_;
+ // Keeps track of all free buffers.
+
+ ACE_SYNCH_MUTEX notify_queue_lock_;
+ // synchronization for handling of queues
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
};
class ACE_Export ACE_Select_Reactor_Handler_Repository
diff --git a/tests/Reactor_Notify_Test.cpp b/tests/Reactor_Notify_Test.cpp
index 5879c6e1bef..50a4583400a 100644
--- a/tests/Reactor_Notify_Test.cpp
+++ b/tests/Reactor_Notify_Test.cpp
@@ -6,16 +6,18 @@
// tests
//
// = FILENAME
-// Reactors_Test.cpp
+// Reactor_Notify_Test.cpp
//
// = DESCRIPTION
// This is a test that illustrates how the <ACE_Reactor>'s <notify>
// method works under various <max_notify_iterations> settings.
// It also tests that the <disable_notify_pipe> option works
-// correctly.
+// correctly. Moreover, if the $ACE_ROOT/ace/config.h file has
+// the ACE_HAS_REACTOR_NOTIFICATION_QUEUE option enabled this
+// test will also exercise this feature.
//
// = AUTHOR
-// Douglas C. Schmidt
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
//
// ============================================================================
@@ -23,6 +25,7 @@
#include "ace/Synch.h"
#include "ace/Task.h"
#include "ace/Pipe.h"
+#include "ace/Auto_Ptr.h"
#include "ace/Select_Reactor.h"
ACE_RCSID(tests, Reactor_Notify_Test, "$Id$")
@@ -34,10 +37,14 @@ USELIB("..\ace\aced.lib");
#if defined (ACE_HAS_THREADS)
+static const int LONG_TIMEOUT = 10;
+static const int SHORT_TIMEOUT = 2;
+
class Supplier_Task : public ACE_Task<ACE_MT_SYNCH>
{
public:
- Supplier_Task (int disable_notify_pipe);
+ Supplier_Task (int disable_notify_pipe,
+ const ACE_Time_Value &tv);
// Constructor.
~Supplier_Task (void);
@@ -62,6 +69,9 @@ public:
// illustrate the difference between "limited" and "unlimited"
// notification.
+ void release (void);
+ // Release the <waiter_>.
+
private:
int perform_notifications (int notifications);
// Perform the notifications.
@@ -71,17 +81,31 @@ private:
// <Reactor>'s notify mechanism.
ACE_Pipe pipe_;
- // We use this pipe just so we can get a handle that is always
- // "active."
+ // We use this pipe just to get a handle that is always "active,"
+ // i.e., the <ACE_Reactor> will always dispatch its <handle_output>
+ // method.
int disable_notify_pipe_;
// Keeps track of whether the notification pipe in the <ACE_Reactor>
// has been diabled or not.
+
+ int long_timeout_;
+ // Keeps track of whether we're running with a <LONG_TIMEOUT>, which
+ // is used for the ACE_HAS_REACTOR_NOTIFICATION_QUEUE portion of
+ // this test.
};
-Supplier_Task::Supplier_Task (int disable_notify_pipe)
+void
+Supplier_Task::release (void)
+{
+ this->waiter_.release ();
+}
+
+Supplier_Task::Supplier_Task (int disable_notify_pipe,
+ const ACE_Time_Value &tv)
: waiter_ (0), // Make semaphore "locked" by default.
- disable_notify_pipe_ (disable_notify_pipe)
+ disable_notify_pipe_ (disable_notify_pipe),
+ long_timeout_ (tv.sec () == LONG_TIMEOUT)
{
}
@@ -89,48 +113,59 @@ int
Supplier_Task::open (void *)
{
// Create the pipe.
- if (this->pipe_.open () == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ASYS_TEXT ("(%t) %p\n"),
- ASYS_TEXT ("open failed")),
- -1);
+ int result;
+
+ result = this->pipe_.open ();
+ ACE_ASSERT (result != -1);
+
// Register the pipe's write handle with the <Reactor> for writing.
// This should mean that it's always "active."
- else if (ACE_Reactor::instance ()->register_handler
- (this->pipe_.write_handle (),
- this,
- ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ASYS_TEXT ("(%t) %p\n"),
- ASYS_TEXT ("register_handler failed")),
- -1);
+ if (long_timeout_ == 0)
+ {
+ result = ACE_Reactor::instance ()->register_handler
+ (this->pipe_.write_handle (),
+ this,
+ ACE_Event_Handler::WRITE_MASK);
+ ACE_ASSERT (result != -1);
+ }
+
// Make this an Active Object.
- else if (this->activate (THR_BOUND | THR_DETACHED) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ASYS_TEXT ("(%t) %p\n"),
- ASYS_TEXT ("activate failed")),
- -1);
- else
- return 0;
+ result = this->activate (THR_BOUND | THR_DETACHED);
+ ACE_ASSERT (result != -1);
+ return 0;
}
int
Supplier_Task::close (u_long)
{
- ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("(%t) Supplier_Task::close\n")));
-
- if (ACE_Reactor::instance ()->remove_handler
- (this->pipe_.write_handle (),
- ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR ((LM_ERROR,
- ASYS_TEXT ("(%t) %p\n"),
- ASYS_TEXT ("remove_handler failed")));
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) Supplier_Task::close\n")));
+
+ int result;
+
+ if (long_timeout_ == 0)
+ {
+ result = ACE_Reactor::instance ()->remove_handler
+ (this->pipe_.write_handle (),
+ ACE_Event_Handler::WRITE_MASK);
+ ACE_ASSERT (result != -1);
+ }
+ else
+ {
+ // Wait to be told to shutdown by the main thread.
+
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) waiting to be shutdown by main thread\n")));
+ result = this->waiter_.acquire ();
+ ACE_ASSERT (result != -1);
+ }
return 0;
}
Supplier_Task::~Supplier_Task (void)
{
- ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("(%t) ~Supplier_Task\n")));
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) ~Supplier_Task\n")));
this->pipe_.close ();
}
@@ -139,26 +174,41 @@ Supplier_Task::perform_notifications (int notifications)
{
ACE_Reactor::instance ()->max_notify_iterations (notifications);
- for (size_t i = 0; i < ACE_MAX_ITERATIONS; i++)
+ int iterations = ACE_MAX_ITERATIONS;
+
+ if (this->long_timeout_)
+ iterations *= (iterations * iterations * iterations);
+
+ for (size_t i = 0; i < iterations; i++)
{
ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) notifying reactor\n")));
+ ASYS_TEXT ("(%t) notifying reactor on iteration %d\n"),
+ i));
+
+ int result;
+
// Notify the Reactor, which will call <handle_exception>.
- if (ACE_Reactor::instance ()->notify (this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ASYS_TEXT ("(%t) %p\n"),
- ASYS_TEXT ("notify")),
- -1);
+ result = ACE_Reactor::instance ()->notify (this);
+ if (result == -1)
+ {
+ if (errno == ETIME)
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) %p\n"),
+ ASYS_TEXT ("notify")));
+ else
+ ACE_ASSERT (result = -1);
+ }
// Wait for our <handle_exception> method to release the
// semaphore.
- else if (this->disable_notify_pipe_ == 0
- && this->waiter_.acquire () == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ASYS_TEXT ("(%t) %p\n"),
- ASYS_TEXT ("acquire")),
- -1);
+ if (this->long_timeout_ == 0
+ && this->disable_notify_pipe_ == 0)
+ {
+ result = this->waiter_.acquire ();
+ ACE_ASSERT (result != -1);
+ }
}
+
return 0;
}
@@ -170,19 +220,19 @@ Supplier_Task::svc (void)
// Allow an unlimited number of iterations per
// <ACE_Reactor::notify>.
- if (this->perform_notifications (-1) == -1)
- return -1;
-
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) **** starting limited notifications test\n")));
-
- // Only allow 1 iteration per <ACE_Reactor::notify>
+ this->perform_notifications (-1);
- if (this->perform_notifications (1) == -1)
- return -1;
+ if (this->long_timeout_ == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) **** starting limited notifications test\n")));
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) **** exiting thread test\n")));
+ // Only allow 1 iteration per <ACE_Reactor::notify>
+ this->perform_notifications (1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) **** exiting thread test\n")));
+ }
return 0;
}
@@ -205,75 +255,103 @@ Supplier_Task::handle_output (ACE_HANDLE handle)
ASYS_TEXT ("(%t) handle_output\n")));
// This function is called by the main thread, believe it or not :-)
- // That's because the pipe's write handle is always active. So,
- // give the <Supplier_Task> a chance to run.
+ // That's because the pipe's write handle is always active. Thus,
+ // we can give the <Supplier_Task> a chance to run in its own
+ // thread.
ACE_OS::thr_yield ();
return 0;
}
-static void
-run_test (int disable_notify_pipe)
+static int
+run_test (int disable_notify_pipe,
+ const ACE_Time_Value &tv)
{
// Create special reactors with the appropriate flags enabled.
ACE_Select_Reactor *reactor_impl = 0;
if (disable_notify_pipe)
- ACE_NEW (reactor_impl,
- ACE_Select_Reactor (0, 0, 1));
+ ACE_NEW_RETURN (reactor_impl,
+ ACE_Select_Reactor (0, 0, 1),
+ -1);
else
- ACE_NEW (reactor_impl,
- ACE_Select_Reactor);
+ ACE_NEW_RETURN (reactor_impl,
+ ACE_Select_Reactor,
+ -1);
ACE_Reactor *reactor;
- ACE_NEW (reactor,
- ACE_Reactor (reactor_impl));
+ ACE_NEW_RETURN (reactor,
+ ACE_Reactor (reactor_impl),
+ -1);
+
+ // Make sure this stuff gets cleaned up when this function exits.
+ auto_ptr<ACE_Reactor> r (reactor);
+ auto_ptr<ACE_Select_Reactor> ri (reactor_impl);
// Set the Singleton Reactor.
ACE_Reactor::instance (reactor);
-
ACE_ASSERT (ACE_LOG_MSG->op_status () != -1);
ACE_ASSERT (ACE_Reactor::instance () == reactor);
- Supplier_Task task (disable_notify_pipe);
+ Supplier_Task task (disable_notify_pipe,
+ tv);
ACE_ASSERT (ACE_LOG_MSG->op_status () != -1);
- if (task.open () == -1)
- ACE_ERROR ((LM_ERROR,
- ASYS_TEXT ("(%t) open failed\n")));
- else
+ int result;
+
+ result = task.open ();
+ ACE_ASSERT (result != -1);
+
+ if (tv.sec () == LONG_TIMEOUT)
+ // Sleep for a while so that the <ACE_Reactor>'s notification
+ // buffers will fill up!
+ ACE_OS::sleep (tv);
+
+ int shutdown = 0;
+
+ // Run the event loop that handles the <handle_output> and
+ // <handle_exception> notifications.
+ for (int iteration = 1;
+ shutdown == 0;
+ iteration++)
{
- int shutdown = 0;
+ ACE_Time_Value timeout (tv);
+
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) starting handle_events() on iteration %d")
+ ASYS_TEXT (" with timeout = %d seconds\n"),
+ iteration,
+ timeout.sec ()));
- // Run the event loop that handles the <handle_output> and
- // <handle_exception> notifications.
- for (int iteration = 1; shutdown == 0; iteration++)
+ // Use a timeout to inform the Reactor when to shutdown.
+ switch (ACE_Reactor::instance ()->handle_events (timeout))
{
- ACE_Time_Value timeout (2);
-
- // Use a timeout to inform the Reactor when to shutdown.
- switch (ACE_Reactor::instance ()->handle_events (timeout))
- {
- case -1:
- ACE_ERROR ((LM_ERROR,
- ASYS_TEXT ("(%t) %p\n"),
- ASYS_TEXT ("reactor")));
- shutdown = 1;
- break;
- /* NOTREACHED */
- case 0:
- shutdown = 1;
- break;
- /* NOTREACHED */
- default:
- break;
- /* NOTREACHED */
- }
+ case -1:
+ ACE_ERROR ((LM_ERROR,
+ ASYS_TEXT ("(%t) %p\n"),
+ ASYS_TEXT ("reactor")));
+ shutdown = 1;
+ break;
+ /* NOTREACHED */
+ case 0:
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) handle_events timed out\n")));
+ shutdown = 1;
+ break;
+ /* NOTREACHED */
+ default:
+ break;
+ /* NOTREACHED */
}
}
- delete reactor_impl;
- delete reactor;
+ if (tv.sec () == LONG_TIMEOUT)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) releasing supplier task thread\n")));
+ task.release ();
+ }
+ return 0;
}
#endif /* ACE_HAS_THREADS */
@@ -284,12 +362,26 @@ main (int, ASYS_TCHAR *[])
ACE_START_TEST (ASYS_TEXT ("Reactor_Notify_Test"));
#if defined (ACE_HAS_THREADS)
+ ACE_Time_Value timeout (SHORT_TIMEOUT);
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) running tests with notify pipe enabled")
+ ASYS_TEXT (" and timeout = %d seconds\n"),
+ timeout.sec ()));
+ run_test (0, timeout);
+
ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) running tests with notify pipe enabled\n")));
- run_test (0);
+ ASYS_TEXT ("(%t) running tests with notify pipe diabled")
+ ASYS_TEXT (" and timeout = %d seconds\n"),
+ timeout.sec ()));
+ run_test (1, timeout);
+
+ timeout.set (LONG_TIMEOUT, 0);
ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) running tests with notify pipe disabled\n")));
- run_test (1);
+ ASYS_TEXT ("(%t) running tests with reactor notification pipe enabled\n")
+ ASYS_TEXT (" and timeout = %d seconds\n"),
+ timeout.sec ()));
+ run_test (0, timeout);
+
#else
ACE_ERROR ((LM_INFO,
ASYS_TEXT ("threads not supported on this platform\n")));
@@ -297,3 +389,11 @@ main (int, ASYS_TCHAR *[])
ACE_END_TEST;
return 0;
}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class auto_ptr<ACE_Reactor>;
+template class auto_ptr<ACE_Select_Reactor>;
+#else
+#pragma instantiate auto_ptr<ACE_Reactor>
+#pragma instantiate auto_ptr<ACE_Select_Reactor>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */