summaryrefslogtreecommitdiff
path: root/tests/Thread_Pool_Test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'tests/Thread_Pool_Test.cpp')
-rw-r--r--tests/Thread_Pool_Test.cpp341
1 files changed, 262 insertions, 79 deletions
diff --git a/tests/Thread_Pool_Test.cpp b/tests/Thread_Pool_Test.cpp
index a66f310040a..165d0d2ad99 100644
--- a/tests/Thread_Pool_Test.cpp
+++ b/tests/Thread_Pool_Test.cpp
@@ -9,13 +9,13 @@
// Thread_Pool_Test.cpp
//
// = DESCRIPTION
-// This test program illustrates how the ACE task synchronization
-// mechanisms and ACE_Message_Block reference counting works in
-// conjunction with the ACE_Task and the ACE_Thread_Manager. If
-// the manual flag is not set input comes from stdin until the
-// user enters a return. This stops all workers via a message
-// block of length 0. This shows an alternative way to shutdown
-// worker tasks compared to queue deactivate.
+// This test program illustrates how the <ACE_Task>
+// synchronization mechanisms work in conjunction with the
+// <ACE_Thread_Manager>. If the <manual> flag is set input comes
+// from stdin until the user enters a return -- otherwise, the
+// input is generated automatically. All worker threads shutdown
+// when (1) they receive a message block of length 0 or (2) the
+// queue is deactivated.
//
// = AUTHOR
// Karlheinz Dorn <Karlheinz.Dorn@med.siemens.de>,
@@ -39,32 +39,49 @@ USELIB("..\ace\aced.lib");
// Number of iterations to run the test.
static size_t n_iterations = 100;
+// Controls whether the input is generated "manually" or automatically.
+static int manual = 0;
+
class Thread_Pool : public ACE_Task<ACE_MT_SYNCH>
{
+ // = TITLE
+ // Defines a thread pool abstraction based on the <ACE_Task>.
public:
Thread_Pool (int n_threads);
// Create the thread pool containing <n_threads>.
~Thread_Pool (void);
+ // Destructor...
- virtual int open (void * = 0);
- // Produce the messages that are consumed by the threads in the
- // thread pool.
+ int test_queue_deactivation_shutdown (void);
+ // Activate the task's thread pool, produce the messages that are
+ // consumed by the threads in the thread pool, and demonstate how to
+ // shutdown using the <ACE_Message_Queue::deactivate> method.
+
+ int test_empty_message_shutdown (void);
+ // Activate the task's thread pool, produce the messages that are,
+ // produce the messages that are consumed by the threads in the
+ // thread pool, and demonstrate how to shutdown by enqueueing
+ // "empty" messages into the queue.
virtual int svc (void);
// Iterate <n_iterations> time printing off a message and "waiting"
// for all other threads to complete this iteration.
- virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0);
+ virtual int put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv = 0);
// Allows the producer to pass messages to the <Thread_Pool>.
private:
+ virtual int open (void * = 0);
+ // Spawn the threads in the pool.
+
virtual int close (u_long);
// Close hook.
ACE_Lock_Adapter<ACE_Thread_Mutex> lock_adapter_;
// Serialize access to <ACE_Message_Block> reference count, which
- // will be decremented from multiple threads.
+ // will be decremented by multiple threads.
int n_threads_;
// Number of threads to spawn.
@@ -78,7 +95,7 @@ int
Thread_Pool::close (u_long)
{
ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) close of worker\n")));
+ ASYS_TEXT ("(%t) worker thread closing down\n")));
return 0;
}
@@ -90,7 +107,8 @@ Thread_Pool::Thread_Pool (int n_threads)
// Simply enqueue the Message_Block into the end of the queue.
int
-Thread_Pool::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
+Thread_Pool::put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv)
{
return this->putq (mb, tv);
}
@@ -109,13 +127,27 @@ Thread_Pool::svc (void)
ACE_Message_Block *mb;
int result = this->getq (mb);
- ACE_ASSERT (result != -1);
+
+ ACE_ASSERT (result != -1 || errno == ESHUTDOWN);
+
+ if (result == -1 && errno == ESHUTDOWN)
+ {
+ // The queue has been deactivated, so let's bail out.
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) in iteration %d, queue len = %d, ")
+ ASYS_TEXT ("queue deactivated, exiting\n"),
+ count,
+ this->msg_queue ()->message_count ()));
+
+ break;
+ }
int length = mb->length ();
if (length > 0)
ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) in iteration %d, queue len = %d, length = %d, text = \"%*s\"\n"),
+ ASYS_TEXT ("(%t) in iteration %d, queue len = %d, ")
+ ASYS_TEXT ("length = %d, text = \"%*s\"\n"),
count,
this->msg_queue ()->message_count (),
length,
@@ -128,7 +160,8 @@ Thread_Pool::svc (void)
if (length == 0)
{
ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) in iteration %d, queue len = %d, got NULL message, exiting\n"),
+ ASYS_TEXT ("(%t) in iteration %d, queue len = %d, ")
+ ASYS_TEXT ("got \"empty\" message, exiting\n"),
count,
this->msg_queue ()->message_count ()));
break;
@@ -154,13 +187,30 @@ Thread_Pool::open (void *)
ASYS_TEXT ("%p\n"),
ASYS_TEXT ("activate failed")),
-1);
+ return 0;
+}
+
+// Activate the task's thread pool, produce the messages that are
+// consumed by the threads in the thread pool, and demonstate how to
+// shutdown using the <ACE_Message_Queue::deactivate> method.
+
+int
+Thread_Pool::test_queue_deactivation_shutdown (void)
+{
+ if (this->open () == -1)
+ return -1;
ACE_Message_Block *mb = 0;
+ // Run the main loop that generates messages and enqueues them into
+ // the pool of threads managed by <ACE_Task>.
+
for (size_t count = 0;
- count < n_iterations;
+ ;
count++)
{
+ ssize_t n = 0;
+
// Allocate a new message.
ACE_NEW_RETURN (mb,
ACE_Message_Block (BUFSIZ,
@@ -171,67 +221,178 @@ Thread_Pool::open (void *)
&this->lock_adapter_),
-1);
- ACE_OS::sprintf ((ASYS_TCHAR *) mb->rd_ptr (),
- ASYS_TEXT ("%d\n"),
- count);
- int n = ACE_OS::strlen ((ASYS_TCHAR *) mb->rd_ptr ());
+ if (manual)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) enter a new message for the task pool..."));
+ n = ACE_OS::read (ACE_STDIN,
+ mb->rd_ptr (),
+ mb->size ());
+ }
+ else
+ {
+ static int count = 0;
+
+ ACE_OS::sprintf (mb->rd_ptr (),
+ "%d\n",
+ count);
+ n = ACE_OS::strlen (mb->rd_ptr ());
- if (count == 0 || (count % 20 == 0))
- ACE_OS::sleep (1);
+ if (count == n_iterations)
+ n = 1; // Indicate that we need to shut down.
+ else
+ count++;
- // Send a normal message to the waiting threads and continue
- // producing.
- mb->wr_ptr (n * sizeof (ASYS_TCHAR));
+ if (count == 0 || (count % 20 == 0))
+ ACE_OS::sleep (1);
+ }
- // Pass the message to the Thread_Pool.
- if (this->put (mb) == -1)
- ACE_ERROR ((LM_ERROR,
- ASYS_TEXT (" (%t) %p\n"),
- ASYS_TEXT ("put")));
+ if (n > 1)
+ {
+ // Send a normal message to the waiting threads and continue
+ // producing.
+ mb->wr_ptr (n * sizeof (ASYS_TCHAR));
+
+ // Pass the message to the Thread_Pool.
+ if (this->put (mb) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ASYS_TEXT (" (%t) %p\n"),
+ ASYS_TEXT ("put")));
+ }
+ else
+ {
+ // Deactivate the message queue and return.
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("\n(%t) deactivating queue for %d threads, ")
+ ASYS_TEXT ("dump of task:\n"),
+ this->thr_count ()));
+ this->dump ();
+
+ // Deactivate the queue.
+ return this->msg_queue ()->deactivate ();
+ }
}
+}
- // Send a shutdown message to the waiting threads and exit.
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("\n(%t) sending shutdown message to %d threads, dump of task:\n"),
- this->thr_count ()));
- this->dump ();
+// Activate the task's thread pool, produce the messages that are,
+// produce the messages that are consumed by the threads in the thread
+// pool, and demonstrate how to shutdown by enqueueing "empty"
+// messages into the queue.
+
+int
+Thread_Pool::test_empty_message_shutdown (void)
+{
+ if (this->open () == -1)
+ return -1;
+
+ ACE_Message_Block *mb = 0;
+
+ // Run the main loop that generates messages and enqueues them into
+ // the pool of threads managed by <ACE_Task>.
- ACE_NEW_RETURN (mb,
- ACE_Message_Block (0,
- ACE_Message_Block::MB_DATA,
- 0,
- 0,
- 0,
- &this->lock_adapter_),
- -1);
- int i = 0;
-
- for (i = this->thr_count ();
- i > 0;
- i--)
+ for (size_t count = 0;
+ ;
+ count++)
{
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) EOF, enqueueing NULL block for thread = %d\n"),
- i));
-
- // Enqueue an empty message to flag each consumer to shutdown.
- // Note that we use reference counting to avoid having to copy
- // the message.
- ACE_Message_Block *dup = mb->duplicate ();
-
- if (this->put (dup) == -1)
- ACE_ERROR ((LM_ERROR,
- ASYS_TEXT (" (%t) %p\n"),
- ASYS_TEXT ("put")));
- }
+ ssize_t n = 0;
- mb->release ();
+ // Allocate a new message.
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ,
+ ACE_Message_Block::MB_DATA,
+ 0,
+ 0,
+ 0,
+ &this->lock_adapter_),
+ -1);
- ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("\n(%t) end loop, dump of task:\n")));
- this->dump ();
+ if (manual)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) enter a new message for the task pool..."));
+ n = ACE_OS::read (ACE_STDIN,
+ mb->rd_ptr (),
+ mb->size ());
+ }
+ else
+ {
+ static int count = 0;
- return 0;
+ ACE_OS::sprintf (mb->rd_ptr (),
+ "%d\n",
+ count);
+ n = ACE_OS::strlen (mb->rd_ptr ());
+
+ if (count == n_iterations)
+ n = 1; // Indicate that we need to shut down.
+ else
+ count++;
+
+ if (count == 0 || (count % 20 == 0))
+ ACE_OS::sleep (1);
+ }
+
+ if (n > 1)
+ {
+ // Send a normal message to the waiting threads and continue
+ // producing.
+ mb->wr_ptr (n * sizeof (ASYS_TCHAR));
+
+ // Pass the message to the Thread_Pool.
+ if (this->put (mb) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ASYS_TEXT (" (%t) %p\n"),
+ ASYS_TEXT ("put")));
+ }
+ else
+ {
+ // Send a shutdown message to the waiting threads and return.
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("\n(%t) sending shutdown message to %d threads, ")
+ ASYS_TEXT ("dump of task:\n"),
+ this->thr_count ()));
+ this->dump ();
+
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (0,
+ ACE_Message_Block::MB_DATA,
+ 0,
+ 0,
+ 0,
+ &this->lock_adapter_),
+ -1);
+ int i = 0;
+
+ // Enqueue an empty message to flag each consumer thread to
+ // inform it to shutdown.
+ for (i = this->thr_count ();
+ i > 0;
+ i--)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) end of input, ")
+ ASYS_TEXT ("enqueueing \"empty\" message %d\n"),
+ i));
+
+ // Note the use of reference counting to avoid copying
+ // the message contents.
+ ACE_Message_Block *dup = mb->duplicate ();
+
+ if (this->put (dup) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ASYS_TEXT (" (%t) %p\n"),
+ ASYS_TEXT ("put")));
+ }
+
+ mb->release ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("\n(%t) end loop, dump of task:\n")));
+ this->dump ();
+
+ return 0;
+ }
+ }
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
@@ -250,28 +411,50 @@ main (int, ASYS_TCHAR *[])
#if defined (ACE_HAS_THREADS)
int n_threads = ACE_MAX_THREADS;
+ // Create the worker tasks.
+ Thread_Pool thread_pool (n_threads);
+
ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) threads = %d\n"),
+ ASYS_TEXT ("(%t) running test with %d threads\n"),
n_threads));
- // Create the worker tasks.
- Thread_Pool thread_pool (n_threads);
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) starting empty message shutdown test\n")));
- // Create work for the worker tasks to process in their own threads.
- if (thread_pool.open () == -1)
+ // Activate the task's thread pool, produce the messages that are,
+ // produce the messages that are consumed by the threads in the
+ // thread pool, and demonstrate how to shutdown by enqueueing
+ // "empty" messages into the queue.
+ if (thread_pool.test_empty_message_shutdown () == -1)
return 1;
- // Wait for all the threads to reach their exit point.
-
ACE_DEBUG ((LM_DEBUG,
ASYS_TEXT ("(%t) waiting for worker tasks to finish...\n")));
+ // Wait for all the threads to reach their exit point, at which
+ // point the barrier in the destructor of the <ACE_Task> portion of
+ // <Thread_Pool> will return.
+ if (thread_pool.wait () == -1)
+ return 1;
+
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) starting queue deactivation shutdown test\n")));
- ACE_Thread_Manager::instance ()->wait ();
+ // Activate the task's thread pool, produce the messages that are
+ // consumed by the threads in the thread pool, and demonstate how to
+ // shutdown using the <ACE_Message_Queue::deactivate> method.
+ if (thread_pool.test_queue_deactivation_shutdown () == -1)
+ return 1;
- ACE_ASSERT (thread_pool.msg_queue ()->is_empty ());
ACE_DEBUG ((LM_DEBUG,
- ASYS_TEXT ("(%t) destroying worker tasks and exiting...\n")));
+ ASYS_TEXT ("(%t) waiting for worker tasks to finish...\n")));
+ // Wait for all the threads to reach their exit point, at which
+ // point the barrier in the destructor of the <ACE_Task> portion of
+ // <Thread_Pool> will return.
+ if (thread_pool.wait () == -1)
+ return 1;
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("(%t) all worker tasks destroyed, exiting test...\n")));
#else
ACE_ERROR ((LM_INFO,
ASYS_TEXT ("threads not supported on this platform\n")));