diff options
Diffstat (limited to 'tests/Thread_Pool_Test.cpp')
-rw-r--r-- | tests/Thread_Pool_Test.cpp | 341 |
1 files changed, 262 insertions, 79 deletions
diff --git a/tests/Thread_Pool_Test.cpp b/tests/Thread_Pool_Test.cpp index a66f310040a..165d0d2ad99 100644 --- a/tests/Thread_Pool_Test.cpp +++ b/tests/Thread_Pool_Test.cpp @@ -9,13 +9,13 @@ // Thread_Pool_Test.cpp // // = DESCRIPTION -// This test program illustrates how the ACE task synchronization -// mechanisms and ACE_Message_Block reference counting works in -// conjunction with the ACE_Task and the ACE_Thread_Manager. If -// the manual flag is not set input comes from stdin until the -// user enters a return. This stops all workers via a message -// block of length 0. This shows an alternative way to shutdown -// worker tasks compared to queue deactivate. +// This test program illustrates how the <ACE_Task> +// synchronization mechanisms work in conjunction with the +// <ACE_Thread_Manager>. If the <manual> flag is set input comes +// from stdin until the user enters a return -- otherwise, the +// input is generated automatically. All worker threads shutdown +// when (1) they receive a message block of length 0 or (2) the +// queue is deactivated. // // = AUTHOR // Karlheinz Dorn <Karlheinz.Dorn@med.siemens.de>, @@ -39,32 +39,49 @@ USELIB("..\ace\aced.lib"); // Number of iterations to run the test. static size_t n_iterations = 100; +// Controls whether the input is generated "manually" or automatically. +static int manual = 0; + class Thread_Pool : public ACE_Task<ACE_MT_SYNCH> { + // = TITLE + // Defines a thread pool abstraction based on the <ACE_Task>. public: Thread_Pool (int n_threads); // Create the thread pool containing <n_threads>. ~Thread_Pool (void); + // Destructor... - virtual int open (void * = 0); - // Produce the messages that are consumed by the threads in the - // thread pool. + int test_queue_deactivation_shutdown (void); + // Activate the task's thread pool, produce the messages that are + // consumed by the threads in the thread pool, and demonstate how to + // shutdown using the <ACE_Message_Queue::deactivate> method. + + int test_empty_message_shutdown (void); + // Activate the task's thread pool, produce the messages that are, + // produce the messages that are consumed by the threads in the + // thread pool, and demonstrate how to shutdown by enqueueing + // "empty" messages into the queue. virtual int svc (void); // Iterate <n_iterations> time printing off a message and "waiting" // for all other threads to complete this iteration. - virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0); + virtual int put (ACE_Message_Block *mb, + ACE_Time_Value *tv = 0); // Allows the producer to pass messages to the <Thread_Pool>. private: + virtual int open (void * = 0); + // Spawn the threads in the pool. + virtual int close (u_long); // Close hook. ACE_Lock_Adapter<ACE_Thread_Mutex> lock_adapter_; // Serialize access to <ACE_Message_Block> reference count, which - // will be decremented from multiple threads. + // will be decremented by multiple threads. int n_threads_; // Number of threads to spawn. @@ -78,7 +95,7 @@ int Thread_Pool::close (u_long) { ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) close of worker\n"))); + ASYS_TEXT ("(%t) worker thread closing down\n"))); return 0; } @@ -90,7 +107,8 @@ Thread_Pool::Thread_Pool (int n_threads) // Simply enqueue the Message_Block into the end of the queue. int -Thread_Pool::put (ACE_Message_Block *mb, ACE_Time_Value *tv) +Thread_Pool::put (ACE_Message_Block *mb, + ACE_Time_Value *tv) { return this->putq (mb, tv); } @@ -109,13 +127,27 @@ Thread_Pool::svc (void) ACE_Message_Block *mb; int result = this->getq (mb); - ACE_ASSERT (result != -1); + + ACE_ASSERT (result != -1 || errno == ESHUTDOWN); + + if (result == -1 && errno == ESHUTDOWN) + { + // The queue has been deactivated, so let's bail out. + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) in iteration %d, queue len = %d, ") + ASYS_TEXT ("queue deactivated, exiting\n"), + count, + this->msg_queue ()->message_count ())); + + break; + } int length = mb->length (); if (length > 0) ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) in iteration %d, queue len = %d, length = %d, text = \"%*s\"\n"), + ASYS_TEXT ("(%t) in iteration %d, queue len = %d, ") + ASYS_TEXT ("length = %d, text = \"%*s\"\n"), count, this->msg_queue ()->message_count (), length, @@ -128,7 +160,8 @@ Thread_Pool::svc (void) if (length == 0) { ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) in iteration %d, queue len = %d, got NULL message, exiting\n"), + ASYS_TEXT ("(%t) in iteration %d, queue len = %d, ") + ASYS_TEXT ("got \"empty\" message, exiting\n"), count, this->msg_queue ()->message_count ())); break; @@ -154,13 +187,30 @@ Thread_Pool::open (void *) ASYS_TEXT ("%p\n"), ASYS_TEXT ("activate failed")), -1); + return 0; +} + +// Activate the task's thread pool, produce the messages that are +// consumed by the threads in the thread pool, and demonstate how to +// shutdown using the <ACE_Message_Queue::deactivate> method. + +int +Thread_Pool::test_queue_deactivation_shutdown (void) +{ + if (this->open () == -1) + return -1; ACE_Message_Block *mb = 0; + // Run the main loop that generates messages and enqueues them into + // the pool of threads managed by <ACE_Task>. + for (size_t count = 0; - count < n_iterations; + ; count++) { + ssize_t n = 0; + // Allocate a new message. ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ, @@ -171,67 +221,178 @@ Thread_Pool::open (void *) &this->lock_adapter_), -1); - ACE_OS::sprintf ((ASYS_TCHAR *) mb->rd_ptr (), - ASYS_TEXT ("%d\n"), - count); - int n = ACE_OS::strlen ((ASYS_TCHAR *) mb->rd_ptr ()); + if (manual) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) enter a new message for the task pool...")); + n = ACE_OS::read (ACE_STDIN, + mb->rd_ptr (), + mb->size ()); + } + else + { + static int count = 0; + + ACE_OS::sprintf (mb->rd_ptr (), + "%d\n", + count); + n = ACE_OS::strlen (mb->rd_ptr ()); - if (count == 0 || (count % 20 == 0)) - ACE_OS::sleep (1); + if (count == n_iterations) + n = 1; // Indicate that we need to shut down. + else + count++; - // Send a normal message to the waiting threads and continue - // producing. - mb->wr_ptr (n * sizeof (ASYS_TCHAR)); + if (count == 0 || (count % 20 == 0)) + ACE_OS::sleep (1); + } - // Pass the message to the Thread_Pool. - if (this->put (mb) == -1) - ACE_ERROR ((LM_ERROR, - ASYS_TEXT (" (%t) %p\n"), - ASYS_TEXT ("put"))); + if (n > 1) + { + // Send a normal message to the waiting threads and continue + // producing. + mb->wr_ptr (n * sizeof (ASYS_TCHAR)); + + // Pass the message to the Thread_Pool. + if (this->put (mb) == -1) + ACE_ERROR ((LM_ERROR, + ASYS_TEXT (" (%t) %p\n"), + ASYS_TEXT ("put"))); + } + else + { + // Deactivate the message queue and return. + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\n(%t) deactivating queue for %d threads, ") + ASYS_TEXT ("dump of task:\n"), + this->thr_count ())); + this->dump (); + + // Deactivate the queue. + return this->msg_queue ()->deactivate (); + } } +} - // Send a shutdown message to the waiting threads and exit. - ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("\n(%t) sending shutdown message to %d threads, dump of task:\n"), - this->thr_count ())); - this->dump (); +// Activate the task's thread pool, produce the messages that are, +// produce the messages that are consumed by the threads in the thread +// pool, and demonstrate how to shutdown by enqueueing "empty" +// messages into the queue. + +int +Thread_Pool::test_empty_message_shutdown (void) +{ + if (this->open () == -1) + return -1; + + ACE_Message_Block *mb = 0; + + // Run the main loop that generates messages and enqueues them into + // the pool of threads managed by <ACE_Task>. - ACE_NEW_RETURN (mb, - ACE_Message_Block (0, - ACE_Message_Block::MB_DATA, - 0, - 0, - 0, - &this->lock_adapter_), - -1); - int i = 0; - - for (i = this->thr_count (); - i > 0; - i--) + for (size_t count = 0; + ; + count++) { - ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) EOF, enqueueing NULL block for thread = %d\n"), - i)); - - // Enqueue an empty message to flag each consumer to shutdown. - // Note that we use reference counting to avoid having to copy - // the message. - ACE_Message_Block *dup = mb->duplicate (); - - if (this->put (dup) == -1) - ACE_ERROR ((LM_ERROR, - ASYS_TEXT (" (%t) %p\n"), - ASYS_TEXT ("put"))); - } + ssize_t n = 0; - mb->release (); + // Allocate a new message. + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ, + ACE_Message_Block::MB_DATA, + 0, + 0, + 0, + &this->lock_adapter_), + -1); - ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("\n(%t) end loop, dump of task:\n"))); - this->dump (); + if (manual) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) enter a new message for the task pool...")); + n = ACE_OS::read (ACE_STDIN, + mb->rd_ptr (), + mb->size ()); + } + else + { + static int count = 0; - return 0; + ACE_OS::sprintf (mb->rd_ptr (), + "%d\n", + count); + n = ACE_OS::strlen (mb->rd_ptr ()); + + if (count == n_iterations) + n = 1; // Indicate that we need to shut down. + else + count++; + + if (count == 0 || (count % 20 == 0)) + ACE_OS::sleep (1); + } + + if (n > 1) + { + // Send a normal message to the waiting threads and continue + // producing. + mb->wr_ptr (n * sizeof (ASYS_TCHAR)); + + // Pass the message to the Thread_Pool. + if (this->put (mb) == -1) + ACE_ERROR ((LM_ERROR, + ASYS_TEXT (" (%t) %p\n"), + ASYS_TEXT ("put"))); + } + else + { + // Send a shutdown message to the waiting threads and return. + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\n(%t) sending shutdown message to %d threads, ") + ASYS_TEXT ("dump of task:\n"), + this->thr_count ())); + this->dump (); + + ACE_NEW_RETURN (mb, + ACE_Message_Block (0, + ACE_Message_Block::MB_DATA, + 0, + 0, + 0, + &this->lock_adapter_), + -1); + int i = 0; + + // Enqueue an empty message to flag each consumer thread to + // inform it to shutdown. + for (i = this->thr_count (); + i > 0; + i--) + { + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) end of input, ") + ASYS_TEXT ("enqueueing \"empty\" message %d\n"), + i)); + + // Note the use of reference counting to avoid copying + // the message contents. + ACE_Message_Block *dup = mb->duplicate (); + + if (this->put (dup) == -1) + ACE_ERROR ((LM_ERROR, + ASYS_TEXT (" (%t) %p\n"), + ASYS_TEXT ("put"))); + } + + mb->release (); + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\n(%t) end loop, dump of task:\n"))); + this->dump (); + + return 0; + } + } } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) @@ -250,28 +411,50 @@ main (int, ASYS_TCHAR *[]) #if defined (ACE_HAS_THREADS) int n_threads = ACE_MAX_THREADS; + // Create the worker tasks. + Thread_Pool thread_pool (n_threads); + ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) threads = %d\n"), + ASYS_TEXT ("(%t) running test with %d threads\n"), n_threads)); - // Create the worker tasks. - Thread_Pool thread_pool (n_threads); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) starting empty message shutdown test\n"))); - // Create work for the worker tasks to process in their own threads. - if (thread_pool.open () == -1) + // Activate the task's thread pool, produce the messages that are, + // produce the messages that are consumed by the threads in the + // thread pool, and demonstrate how to shutdown by enqueueing + // "empty" messages into the queue. + if (thread_pool.test_empty_message_shutdown () == -1) return 1; - // Wait for all the threads to reach their exit point. - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("(%t) waiting for worker tasks to finish...\n"))); + // Wait for all the threads to reach their exit point, at which + // point the barrier in the destructor of the <ACE_Task> portion of + // <Thread_Pool> will return. + if (thread_pool.wait () == -1) + return 1; + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) starting queue deactivation shutdown test\n"))); - ACE_Thread_Manager::instance ()->wait (); + // Activate the task's thread pool, produce the messages that are + // consumed by the threads in the thread pool, and demonstate how to + // shutdown using the <ACE_Message_Queue::deactivate> method. + if (thread_pool.test_queue_deactivation_shutdown () == -1) + return 1; - ACE_ASSERT (thread_pool.msg_queue ()->is_empty ()); ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) destroying worker tasks and exiting...\n"))); + ASYS_TEXT ("(%t) waiting for worker tasks to finish...\n"))); + // Wait for all the threads to reach their exit point, at which + // point the barrier in the destructor of the <ACE_Task> portion of + // <Thread_Pool> will return. + if (thread_pool.wait () == -1) + return 1; + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) all worker tasks destroyed, exiting test...\n"))); #else ACE_ERROR ((LM_INFO, ASYS_TEXT ("threads not supported on this platform\n"))); |