summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>2000-01-18 03:18:18 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>2000-01-18 03:18:18 +0000
commit948d6bfec7f0b11d5ea06b8c71bc7f076832fdb3 (patch)
tree8f082fc22f230edfd8169803c4be9524245e70a3
parent40460185f7e4b1126335c841366a14ef9ecb1837 (diff)
downloadATCD-948d6bfec7f0b11d5ea06b8c71bc7f076832fdb3.tar.gz
ChangeLogTag:Mon Jan 17 18:03:17 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
-rw-r--r--ChangeLog32
-rw-r--r--ChangeLogs/ChangeLog-02a32
-rw-r--r--ChangeLogs/ChangeLog-03a32
-rw-r--r--ace/Message_Queue.h26
-rw-r--r--ace/Message_Queue_T.h2
-rw-r--r--ace/Task_T.cpp13
-rw-r--r--ace/Task_T.h11
-rw-r--r--examples/Threads/thread_pool.cpp39
-rw-r--r--tests/Thread_Pool_Test.cpp341
9 files changed, 396 insertions, 132 deletions
diff --git a/ChangeLog b/ChangeLog
index daebfc7eb2f..db870295d79 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,33 @@
+Mon Jan 17 18:03:17 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * tests/Thread_Pool_Test.cpp: Enhanced this test to illustrate how
+ to shut down Tasks using either the "empty message" strategy or
+ the "queue deactivation" strategy.
+
+ * ace/Message_Queue.h: Updated the documentation of the enqueue*()
+ and dequeue*() methods to clarify which errno values are set
+ when the calls return -1.
+
+ * examples/Threads/thread_pool.cpp: Updated this example to
+ remove the use of the now-defunct "wait_for_threads_to_shutdown"
+ feature of ACE_Task.
+
+ * ace/Task_T: Removed the recent feature added on
+
+ Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ that allowed to an ACE_Task's destructor to wait for threads in
+ a task to exit. It turns out this is practically impossible to
+ use correctly because of the way that destructors are destroyed
+ from the "top down". However, it's trivial to get the same
+ behavior by simply calling the Tasks's wait() method whenever
+ you want to implement barrier synchronization on a Task's thread
+ exits.
+
+ * tests/Thread_Pool_Test.cpp: Updated this test to illustrate
+ various strategies to wait for threads to exit. Thanks to Mark
+ C. Barnes <marcus@muse3d.com> for motivating this example.
+
Mon Jan 17 14:20:17 2000 Nanbor Wang <nanbor@cs.wustl.edu>
* ace/OS.h:
@@ -273,7 +303,7 @@ Sun Jan 9 00:25:58 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
reporting this problem with VxWorks.
* ace/Pair_T.h: Reformatted to conform to the ACE style.
-
+
Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
* examples/Threads/thread_pool.cpp: Revised the example to
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index daebfc7eb2f..db870295d79 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,33 @@
+Mon Jan 17 18:03:17 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * tests/Thread_Pool_Test.cpp: Enhanced this test to illustrate how
+ to shut down Tasks using either the "empty message" strategy or
+ the "queue deactivation" strategy.
+
+ * ace/Message_Queue.h: Updated the documentation of the enqueue*()
+ and dequeue*() methods to clarify which errno values are set
+ when the calls return -1.
+
+ * examples/Threads/thread_pool.cpp: Updated this example to
+ remove the use of the now-defunct "wait_for_threads_to_shutdown"
+ feature of ACE_Task.
+
+ * ace/Task_T: Removed the recent feature added on
+
+ Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ that allowed to an ACE_Task's destructor to wait for threads in
+ a task to exit. It turns out this is practically impossible to
+ use correctly because of the way that destructors are destroyed
+ from the "top down". However, it's trivial to get the same
+ behavior by simply calling the Tasks's wait() method whenever
+ you want to implement barrier synchronization on a Task's thread
+ exits.
+
+ * tests/Thread_Pool_Test.cpp: Updated this test to illustrate
+ various strategies to wait for threads to exit. Thanks to Mark
+ C. Barnes <marcus@muse3d.com> for motivating this example.
+
Mon Jan 17 14:20:17 2000 Nanbor Wang <nanbor@cs.wustl.edu>
* ace/OS.h:
@@ -273,7 +303,7 @@ Sun Jan 9 00:25:58 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
reporting this problem with VxWorks.
* ace/Pair_T.h: Reformatted to conform to the ACE style.
-
+
Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
* examples/Threads/thread_pool.cpp: Revised the example to
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index daebfc7eb2f..db870295d79 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,3 +1,33 @@
+Mon Jan 17 18:03:17 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * tests/Thread_Pool_Test.cpp: Enhanced this test to illustrate how
+ to shut down Tasks using either the "empty message" strategy or
+ the "queue deactivation" strategy.
+
+ * ace/Message_Queue.h: Updated the documentation of the enqueue*()
+ and dequeue*() methods to clarify which errno values are set
+ when the calls return -1.
+
+ * examples/Threads/thread_pool.cpp: Updated this example to
+ remove the use of the now-defunct "wait_for_threads_to_shutdown"
+ feature of ACE_Task.
+
+ * ace/Task_T: Removed the recent feature added on
+
+ Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ that allowed to an ACE_Task's destructor to wait for threads in
+ a task to exit. It turns out this is practically impossible to
+ use correctly because of the way that destructors are destroyed
+ from the "top down". However, it's trivial to get the same
+ behavior by simply calling the Tasks's wait() method whenever
+ you want to implement barrier synchronization on a Task's thread
+ exits.
+
+ * tests/Thread_Pool_Test.cpp: Updated this test to illustrate
+ various strategies to wait for threads to exit. Thanks to Mark
+ C. Barnes <marcus@muse3d.com> for motivating this example.
+
Mon Jan 17 14:20:17 2000 Nanbor Wang <nanbor@cs.wustl.edu>
* ace/OS.h:
@@ -273,7 +303,7 @@ Sun Jan 9 00:25:58 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
reporting this problem with VxWorks.
* ace/Pair_T.h: Reformatted to conform to the ACE style.
-
+
Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
* examples/Threads/thread_pool.cpp: Revised the example to
diff --git a/ace/Message_Queue.h b/ace/Message_Queue.h
index 75632ba2273..d028bcb999c 100644
--- a/ace/Message_Queue.h
+++ b/ace/Message_Queue.h
@@ -63,27 +63,35 @@ public:
// = Enqueue and dequeue methods.
- // For the following enqueue and dequeue methods if <timeout> == 0,
- // the caller will block until action is possible, else will wait
- // until the absolute time specified in *<timeout> elapses). These
- // calls will return, however, when queue is closed, deactivated,
- // when a signal occurs, or if the time specified in timeout
- // elapses, (in which case errno = EWOULDBLOCK).
+ // For the following enqueue and dequeue methods, the caller will
+ // block until action is possible if <timeout> == 0. Otherwise, it
+ // will wait until the absolute time specified in *<timeout>
+ // elapses. These calls will -1 when queue is closed, deactivated
+ // (in which case <errno> == <ESHUTDOWN>), when a signal occurs (in
+ // which case <errno> == <EINTR>, or if the time specified in
+ // timeout elapses (in which case <errno> == <EWOULDBLOCK>).
virtual int enqueue_tail (ACE_Message_Block *new_item,
ACE_Time_Value *timeout = 0) = 0;
+ // Enqueue a <ACE_Message_Block *> into the tail of the queue.
+ // Returns number of items in queue if the call succeeds or -1
+ // otherwise.
virtual int enqueue (ACE_Message_Block *new_item,
ACE_Time_Value *timeout = 0) = 0;
// Enqueue a <ACE_Message_Block *> into the tail of the queue.
- // Return -1 on failure, number of items in queue otherwise.
+ // Returns number of items in queue if the call succeeds or -1
+ // otherwise.
virtual int dequeue_head (ACE_Message_Block *&first_item,
ACE_Time_Value *timeout = 0) = 0;
+ // Dequeue and return the <ACE_Message_Block *> at the head of the
+ // queue. Returns number of items in queue if the call succeeds or
+ // -1 otherwise.
virtual int dequeue (ACE_Message_Block *&first_item,
ACE_Time_Value *timeout = 0) = 0;
// Dequeue and return the <ACE_Message_Block *> at the head of the
- // queue. Returns -1 on failure, else the number of items still on
- // the queue.
+ // queue. Returns number of items in queue if the call succeeds or
+ // -1 otherwise.
// = Check if queue is full/empty.
virtual int is_full (void) = 0;
diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h
index 6543244d54a..31af1dea39a 100644
--- a/ace/Message_Queue_T.h
+++ b/ace/Message_Queue_T.h
@@ -10,7 +10,7 @@
// Message_Queue_T.h
//
// = AUTHOR
-// Doug Schmidt
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
//
// ============================================================================
diff --git a/ace/Task_T.cpp b/ace/Task_T.cpp
index 88a9354396b..17c4612e67e 100644
--- a/ace/Task_T.cpp
+++ b/ace/Task_T.cpp
@@ -45,14 +45,12 @@ ACE_Task<ACE_SYNCH_USE>::dump (void) const
template<ACE_SYNCH_DECL>
ACE_Task<ACE_SYNCH_USE>::ACE_Task (ACE_Thread_Manager *thr_man,
- ACE_Message_Queue<ACE_SYNCH_USE> *mq,
- int wait_for_threads_in_destructor)
+ ACE_Message_Queue<ACE_SYNCH_USE> *mq)
: ACE_Task_Base (thr_man),
msg_queue_ (0),
delete_msg_queue_ (0),
mod_ (0),
- next_ (0),
- wait_for_threads_in_destructor_ (wait_for_threads_in_destructor)
+ next_ (0)
{
ACE_TRACE ("ACE_Task<ACE_SYNCH_USE>::ACE_Task");
@@ -70,13 +68,6 @@ template<ACE_SYNCH_DECL>
ACE_Task<ACE_SYNCH_USE>::~ACE_Task (void)
{
ACE_TRACE ("ACE_Task<ACE_SYNCH_USE>::~ACE_Task");
- // Wait for any and all threads in this task to exit.
- if (this->wait_for_threads_in_destructor_ != 0
- && this->wait () == -1)
- ACE_ERROR ((LM_ERROR,
- ASYS_TEXT ("(%t) %p\n"),
- ASYS_TEXT ("wait() failed in ~ACE_Task_Base")));
-
if (this->delete_msg_queue_)
delete this->msg_queue_;
diff --git a/ace/Task_T.h b/ace/Task_T.h
index f0950e91a0e..369032ea953 100644
--- a/ace/Task_T.h
+++ b/ace/Task_T.h
@@ -45,14 +45,11 @@ public:
// = Initialization/termination methods.
ACE_Task (ACE_Thread_Manager *thr_mgr = 0,
- ACE_Message_Queue<ACE_SYNCH_USE> *mq = 0,
- int wait_for_threads_in_destructor = 0);
+ ACE_Message_Queue<ACE_SYNCH_USE> *mq = 0);
// Initialize a Task, supplying a thread manager and a message
// queue. If the user doesn't supply a ACE_Message_Queue pointer
// then we'll allocate one dynamically. Otherwise, we'll use the
- // one passed as a parameter. If <wait_for_threads_in_destructor>
- // is non-0 then <~ACE_Task> will wait for any threads in this task
- // to exit before returning.
+ // one passed as a parameter.
virtual ~ACE_Task (void);
// Destructor.
@@ -140,10 +137,6 @@ public: // Should be protected:
ACE_Task<ACE_SYNCH_USE> *next_;
// Pointer to adjacent ACE_Task.
- int wait_for_threads_in_destructor_;
- // If this is non-0 then wait for threads in this task to exit
- // before returning from the destructor.
-
void dump (void) const;
// Dump the state of an object.
diff --git a/examples/Threads/thread_pool.cpp b/examples/Threads/thread_pool.cpp
index 8bd09996ed8..166a0e3d1cc 100644
--- a/examples/Threads/thread_pool.cpp
+++ b/examples/Threads/thread_pool.cpp
@@ -1,11 +1,11 @@
// $Id$
-// This test program illustrates how the ACE task synchronization
-// mechanisms work in conjunction with the ACE_Task and the
-// ACE_Thread_Manager. If the <manual> flag is set input comes from
-// stdin until the user enters a return -- otherwise, the input is
-// generated automatically. All worker threads shutdown when they
-// receive a message block of length 0.
+// This test program illustrates how the <ACE_Task> synchronization
+// mechanisms work in conjunction with the <ACE_Thread_Manager>. If
+// the <manual> flag is set input comes from stdin until the user
+// enters a return -- otherwise, the input is generated automatically.
+// All worker threads shutdown when they receive a message block of
+// length 0.
//
// This code is original based on a test program written by Karlheinz
// Dorn <Karlheinz.Dorn@med.siemens.de>. It was modified to utilize
@@ -61,9 +61,7 @@ Thread_Pool::close (u_long)
Thread_Pool::Thread_Pool (ACE_Thread_Manager *thr_mgr,
int n_threads)
- : ACE_Task<ACE_MT_SYNCH> (thr_mgr,
- 0,
- 1) // Wait in destructor for threads to exit...
+ : ACE_Task<ACE_MT_SYNCH> (thr_mgr)
{
// Create the pool of worker threads.
if (this->activate (THR_NEW_LWP,
@@ -197,11 +195,11 @@ producer (Thread_Pool &thread_pool)
}
else
{
- // Send a shutdown message to the waiting threads and exit.
ACE_DEBUG ((LM_DEBUG,
"\n(%t) start loop, dump of task:\n"));
// thread_pool.dump ();
+ // Send a shutdown message to the waiting threads and exit.
for (int i = thread_pool.thr_count (); i > 0; i--)
{
ACE_DEBUG ((LM_DEBUG,
@@ -239,18 +237,19 @@ main (int argc, char *argv[])
argc,
n_threads));
- {
- // Create the worker tasks.
- Thread_Pool thread_pool (ACE_Thread_Manager::instance (),
- n_threads);
+ // Create the worker tasks.
+ Thread_Pool thread_pool (ACE_Thread_Manager::instance (),
+ n_threads);
- // Create work for the worker tasks to process in their own threads.
- producer (thread_pool);
+ // Create work for the worker tasks to process in their own threads.
+ producer (thread_pool);
- // Wait for all the threads to reach their exit point.
- ACE_DEBUG ((LM_DEBUG,
- "(%t) waiting for threads to exit in Thread_Pool destructor...\n"));
- }
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) waiting for threads to exit in Thread_Pool destructor...\n"));
+ // Wait for all the threads to reach their exit point.
+ if (thread_pool.wait () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) wait() failed\n"),
+ 1);
ACE_DEBUG ((LM_DEBUG,
"(%t) destroying worker tasks and exiting...\n"));
diff --git a/tests/Thread_Pool_Test.cpp b/tests/Thread_Pool_Test.cpp
index a66f310040a..165d0d2ad99 100644
--- a/tests/Thread_Pool_Test.cpp
+++ b/tests/Thread_Pool_Test.cpp
@@ -9,13 +9,13 @@
// Thread_Pool_Test.cpp
//
// = DESCRIPTION
-// This test program illustrates how the ACE task synchronization
-// mechanisms and ACE_Message_Block reference counting works in
-// conjunction with the ACE_Task and the ACE_Thread_Manager. If
-// the manual flag is not set input comes from stdin until the
-// user enters a return. This stops all workers via a message
-// block of length 0. This shows an alternative way to shutdown
-// worker tasks compared to queue deactivate.
+// This test program illustrates how the <ACE_Task>
+// synchronization mechanisms work in conjunction with the
+// <ACE_Thread_Manager>. If the <manual> flag is set input comes
+// from stdin until the user enters a return -- otherwise, the
+// input is generated automatically. All worker threads shutdown
+// when (1) they receive a message block of length 0 or (2) the
+// queue is deactivated.
//
// = AUTHOR
// Karlheinz Dorn <Karlheinz.Dorn@med.siemens.de>,
@@ -39,32 +39,49 @@ USELIB("..\ace\aced.lib");
// Number of iterations to run the test.
static size_t n_iterations = 100;
+// Controls whether the input is generated "manually" or automatically.
+static int manual = 0;
+
class Thread_Pool : public ACE_Task<ACE_MT_SYNCH>
{
+ // = TITLE
+ // Defines a thread pool abstraction based on the <ACE_Task>.
public:
Thread_Pool (int n_threads);
// Create the thread pool containing <n_threads>.
~Thread_Pool (void);
+ // Destructor...
- virtual int open (void * = 0);
- // Produce the messages that are consumed by the threads in the
- // thread pool.
+ int test_queue_deactivation_shutdown (void);
+ // Activate the task's thread pool, produce the messages that are
+ // consumed by the threads in the thread pool, and demonstate how to
+ // shutdown using the <ACE_Message_Queue::deactivate> method.
+
+ int test_empty_message_shutdown (void);
+ // Activate the task's thread pool, produce the messages that are,
+ // produce the messages that are consumed by the threads in the
+ // thread pool, and demonstrate how to shutdown by enqueueing
+ // "empty" messages into the queue.
virtual int svc (void);
// Iterate <n_iterations> time printing off a message and "waiting"
// for all other threads to complete this iteration.
- virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0);
+ virtual int put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv = 0);
// Allows the producer to pass messages to the <Thread_Pool>.
private:
+ virtual int open (void * = 0);
+ // Spawn the threads in the pool.
+
virtual int close (u_long);
// Close hook.
ACE_Lock_Adapter<ACE_Thread_Mutex> lock_adapter_;
// Serialize access to <ACE_Message_Block> reference count, which
- // will be decremented from multiple threads.
+ // will be decremented by multiple threads.
int n_threads_;
// Number of threads to spawn.
@@ -78,7 +95,7 @@ int
Thread_Pool::close (u_long)
{
ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) close of worker\n")));
+ ASYS_TEXT ("(%t) worker thread closing down\n")));
return 0;
}
@@ -90,7 +107,8 @@ Thread_Pool::Thread_Pool (int n_threads)
// Simply enqueue the Message_Block into the end of the queue.
int
-Thread_Pool::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
+Thread_Pool::put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv)
{
return this->putq (mb, tv);
}
@@ -109,13 +127,27 @@ Thread_Pool::svc (void)
ACE_Message_Block *mb;
int result = this->getq (mb);
- ACE_ASSERT (result != -1);
+
+ ACE_ASSERT (result != -1 || errno == ESHUTDOWN);
+
+ if (result == -1 && errno == ESHUTDOWN)
+ {
+ // The queue has been deactivated, so let's bail out.
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) in iteration %d, queue len = %d, ")
+ ASYS_TEXT ("queue deactivated, exiting\n"),
+ count,
+ this->msg_queue ()->message_count ()));
+
+ break;
+ }
int length = mb->length ();
if (length > 0)
ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) in iteration %d, queue len = %d, length = %d, text = \"%*s\"\n"),
+ ASYS_TEXT ("(%t) in iteration %d, queue len = %d, ")
+ ASYS_TEXT ("length = %d, text = \"%*s\"\n"),
count,
this->msg_queue ()->message_count (),
length,
@@ -128,7 +160,8 @@ Thread_Pool::svc (void)
if (length == 0)
{
ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) in iteration %d, queue len = %d, got NULL message, exiting\n"),
+ ASYS_TEXT ("(%t) in iteration %d, queue len = %d, ")
+ ASYS_TEXT ("got \"empty\" message, exiting\n"),
count,
this->msg_queue ()->message_count ()));
break;
@@ -154,13 +187,30 @@ Thread_Pool::open (void *)
ASYS_TEXT ("%p\n"),
ASYS_TEXT ("activate failed")),
-1);
+ return 0;
+}
+
+// Activate the task's thread pool, produce the messages that are
+// consumed by the threads in the thread pool, and demonstate how to
+// shutdown using the <ACE_Message_Queue::deactivate> method.
+
+int
+Thread_Pool::test_queue_deactivation_shutdown (void)
+{
+ if (this->open () == -1)
+ return -1;
ACE_Message_Block *mb = 0;
+ // Run the main loop that generates messages and enqueues them into
+ // the pool of threads managed by <ACE_Task>.
+
for (size_t count = 0;
- count < n_iterations;
+ ;
count++)
{
+ ssize_t n = 0;
+
// Allocate a new message.
ACE_NEW_RETURN (mb,
ACE_Message_Block (BUFSIZ,
@@ -171,67 +221,178 @@ Thread_Pool::open (void *)
&this->lock_adapter_),
-1);
- ACE_OS::sprintf ((ASYS_TCHAR *) mb->rd_ptr (),
- ASYS_TEXT ("%d\n"),
- count);
- int n = ACE_OS::strlen ((ASYS_TCHAR *) mb->rd_ptr ());
+ if (manual)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) enter a new message for the task pool..."));
+ n = ACE_OS::read (ACE_STDIN,
+ mb->rd_ptr (),
+ mb->size ());
+ }
+ else
+ {
+ static int count = 0;
+
+ ACE_OS::sprintf (mb->rd_ptr (),
+ "%d\n",
+ count);
+ n = ACE_OS::strlen (mb->rd_ptr ());
- if (count == 0 || (count % 20 == 0))
- ACE_OS::sleep (1);
+ if (count == n_iterations)
+ n = 1; // Indicate that we need to shut down.
+ else
+ count++;
- // Send a normal message to the waiting threads and continue
- // producing.
- mb->wr_ptr (n * sizeof (ASYS_TCHAR));
+ if (count == 0 || (count % 20 == 0))
+ ACE_OS::sleep (1);
+ }
- // Pass the message to the Thread_Pool.
- if (this->put (mb) == -1)
- ACE_ERROR ((LM_ERROR,
- ASYS_TEXT (" (%t) %p\n"),
- ASYS_TEXT ("put")));
+ if (n > 1)
+ {
+ // Send a normal message to the waiting threads and continue
+ // producing.
+ mb->wr_ptr (n * sizeof (ASYS_TCHAR));
+
+ // Pass the message to the Thread_Pool.
+ if (this->put (mb) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ASYS_TEXT (" (%t) %p\n"),
+ ASYS_TEXT ("put")));
+ }
+ else
+ {
+ // Deactivate the message queue and return.
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("\n(%t) deactivating queue for %d threads, ")
+ ASYS_TEXT ("dump of task:\n"),
+ this->thr_count ()));
+ this->dump ();
+
+ // Deactivate the queue.
+ return this->msg_queue ()->deactivate ();
+ }
}
+}
- // Send a shutdown message to the waiting threads and exit.
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("\n(%t) sending shutdown message to %d threads, dump of task:\n"),
- this->thr_count ()));
- this->dump ();
+// Activate the task's thread pool, produce the messages that are,
+// produce the messages that are consumed by the threads in the thread
+// pool, and demonstrate how to shutdown by enqueueing "empty"
+// messages into the queue.
+
+int
+Thread_Pool::test_empty_message_shutdown (void)
+{
+ if (this->open () == -1)
+ return -1;
+
+ ACE_Message_Block *mb = 0;
+
+ // Run the main loop that generates messages and enqueues them into
+ // the pool of threads managed by <ACE_Task>.
- ACE_NEW_RETURN (mb,
- ACE_Message_Block (0,
- ACE_Message_Block::MB_DATA,
- 0,
- 0,
- 0,
- &this->lock_adapter_),
- -1);
- int i = 0;
-
- for (i = this->thr_count ();
- i > 0;
- i--)
+ for (size_t count = 0;
+ ;
+ count++)
{
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) EOF, enqueueing NULL block for thread = %d\n"),
- i));
-
- // Enqueue an empty message to flag each consumer to shutdown.
- // Note that we use reference counting to avoid having to copy
- // the message.
- ACE_Message_Block *dup = mb->duplicate ();
-
- if (this->put (dup) == -1)
- ACE_ERROR ((LM_ERROR,
- ASYS_TEXT (" (%t) %p\n"),
- ASYS_TEXT ("put")));
- }
+ ssize_t n = 0;
- mb->release ();
+ // Allocate a new message.
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ,
+ ACE_Message_Block::MB_DATA,
+ 0,
+ 0,
+ 0,
+ &this->lock_adapter_),
+ -1);
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("\n(%t) end loop, dump of task:\n")));
- this->dump ();
+ if (manual)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) enter a new message for the task pool..."));
+ n = ACE_OS::read (ACE_STDIN,
+ mb->rd_ptr (),
+ mb->size ());
+ }
+ else
+ {
+ static int count = 0;
- return 0;
+ ACE_OS::sprintf (mb->rd_ptr (),
+ "%d\n",
+ count);
+ n = ACE_OS::strlen (mb->rd_ptr ());
+
+ if (count == n_iterations)
+ n = 1; // Indicate that we need to shut down.
+ else
+ count++;
+
+ if (count == 0 || (count % 20 == 0))
+ ACE_OS::sleep (1);
+ }
+
+ if (n > 1)
+ {
+ // Send a normal message to the waiting threads and continue
+ // producing.
+ mb->wr_ptr (n * sizeof (ASYS_TCHAR));
+
+ // Pass the message to the Thread_Pool.
+ if (this->put (mb) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ASYS_TEXT (" (%t) %p\n"),
+ ASYS_TEXT ("put")));
+ }
+ else
+ {
+ // Send a shutdown message to the waiting threads and return.
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("\n(%t) sending shutdown message to %d threads, ")
+ ASYS_TEXT ("dump of task:\n"),
+ this->thr_count ()));
+ this->dump ();
+
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (0,
+ ACE_Message_Block::MB_DATA,
+ 0,
+ 0,
+ 0,
+ &this->lock_adapter_),
+ -1);
+ int i = 0;
+
+ // Enqueue an empty message to flag each consumer thread to
+ // inform it to shutdown.
+ for (i = this->thr_count ();
+ i > 0;
+ i--)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) end of input, ")
+ ASYS_TEXT ("enqueueing \"empty\" message %d\n"),
+ i));
+
+ // Note the use of reference counting to avoid copying
+ // the message contents.
+ ACE_Message_Block *dup = mb->duplicate ();
+
+ if (this->put (dup) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ASYS_TEXT (" (%t) %p\n"),
+ ASYS_TEXT ("put")));
+ }
+
+ mb->release ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("\n(%t) end loop, dump of task:\n")));
+ this->dump ();
+
+ return 0;
+ }
+ }
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
@@ -250,28 +411,50 @@ main (int, ASYS_TCHAR *[])
#if defined (ACE_HAS_THREADS)
int n_threads = ACE_MAX_THREADS;
+ // Create the worker tasks.
+ Thread_Pool thread_pool (n_threads);
+
ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) threads = %d\n"),
+ ASYS_TEXT ("(%t) running test with %d threads\n"),
n_threads));
- // Create the worker tasks.
- Thread_Pool thread_pool (n_threads);
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) starting empty message shutdown test\n")));
- // Create work for the worker tasks to process in their own threads.
- if (thread_pool.open () == -1)
+ // Activate the task's thread pool, produce the messages that are,
+ // produce the messages that are consumed by the threads in the
+ // thread pool, and demonstrate how to shutdown by enqueueing
+ // "empty" messages into the queue.
+ if (thread_pool.test_empty_message_shutdown () == -1)
return 1;
- // Wait for all the threads to reach their exit point.
-
ACE_DEBUG ((LM_DEBUG,
ASYS_TEXT ("(%t) waiting for worker tasks to finish...\n")));
+ // Wait for all the threads to reach their exit point, at which
+ // point the barrier in the destructor of the <ACE_Task> portion of
+ // <Thread_Pool> will return.
+ if (thread_pool.wait () == -1)
+ return 1;
+
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) starting queue deactivation shutdown test\n")));
- ACE_Thread_Manager::instance ()->wait ();
+ // Activate the task's thread pool, produce the messages that are
+ // consumed by the threads in the thread pool, and demonstate how to
+ // shutdown using the <ACE_Message_Queue::deactivate> method.
+ if (thread_pool.test_queue_deactivation_shutdown () == -1)
+ return 1;
- ACE_ASSERT (thread_pool.msg_queue ()->is_empty ());
ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) destroying worker tasks and exiting...\n")));
+ ASYS_TEXT ("(%t) waiting for worker tasks to finish...\n")));
+ // Wait for all the threads to reach their exit point, at which
+ // point the barrier in the destructor of the <ACE_Task> portion of
+ // <Thread_Pool> will return.
+ if (thread_pool.wait () == -1)
+ return 1;
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) all worker tasks destroyed, exiting test...\n")));
#else
ACE_ERROR ((LM_INFO,
ASYS_TEXT ("threads not supported on this platform\n")));